aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala10
24 files changed, 80 insertions, 208 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 92cfd5f841..cad7e25a32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
import org.apache.spark.sql.types.DataType
@@ -103,8 +104,11 @@ private[sql] case class PhysicalRDD(
override val outputPartitioning: Partitioning = UnknownPartitioning(0))
extends LeafNode {
+ private[sql] override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
protected override def doExecute(): RDD[InternalRow] = {
- if (isUnsafeRow) {
+ val unsafeRow = if (isUnsafeRow) {
rdd
} else {
rdd.mapPartitionsInternal { iter =>
@@ -112,6 +116,12 @@ private[sql] case class PhysicalRDD(
iter.map(proj)
}
}
+
+ val numOutputRows = longMetric("numOutputRows")
+ unsafeRow.map { r =>
+ numOutputRows += 1
+ r
+ }
}
override def simpleString: String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index 59057bf966..f8aec9e7a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
@@ -29,6 +30,9 @@ private[sql] case class LocalTableScan(
output: Seq[Attribute],
rows: Seq[InternalRow]) extends LeafNode {
+ private[sql] override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
private val unsafeRows: Array[InternalRow] = {
val proj = UnsafeProjection.create(output, output)
rows.map(r => proj(r).copy()).toArray
@@ -36,7 +40,13 @@ private[sql] case class LocalTableScan(
private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
- protected override def doExecute(): RDD[InternalRow] = rdd
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+ rdd.map { r =>
+ numOutputRows += 1
+ r
+ }
+ }
override def executeCollect(): Array[InternalRow] = {
unsafeRows
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
index 06a3991459..9fcfea8381 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
@@ -46,7 +46,6 @@ case class SortBasedAggregate(
AttributeSet(aggregateBufferAttributes)
override private[sql] lazy val metrics = Map(
- "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
@@ -68,7 +67,6 @@ case class SortBasedAggregate(
}
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
- val numInputRows = longMetric("numInputRows")
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsInternal { iter =>
// Because the constructor of an aggregation iterator will read at least the first row,
@@ -89,7 +87,6 @@ case class SortBasedAggregate(
resultExpressions,
(expressions, inputSchema) =>
newMutableProjection(expressions, inputSchema, subexpressionEliminationEnabled),
- numInputRows,
numOutputRows)
if (!hasInput && groupingExpressions.isEmpty) {
// There is no input and there is no grouping expressions.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index 6501634ff9..8f974980bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -35,7 +35,6 @@ class SortBasedAggregationIterator(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection),
- numInputRows: LongSQLMetric,
numOutputRows: LongSQLMetric)
extends AggregationIterator(
groupingExpressions,
@@ -97,7 +96,6 @@ class SortBasedAggregationIterator(
val inputRow = inputIterator.next()
nextGroupingKey = groupingProjection(inputRow).copy()
firstRowInNextGroup = inputRow.copy()
- numInputRows += 1
sortedInputHasNewGroup = true
} else {
// This inputIter is empty.
@@ -122,7 +120,6 @@ class SortBasedAggregationIterator(
// Get the grouping key.
val currentRow = inputIterator.next()
val groupingKey = groupingProjection(currentRow)
- numInputRows += 1
// Check if the current row belongs the current input row.
if (currentGroupingKey == groupingKey) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 340b8f78e5..a6950f805a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -47,7 +47,6 @@ case class TungstenAggregate(
require(TungstenAggregate.supportsAggregate(aggregateBufferAttributes))
override private[sql] lazy val metrics = Map(
- "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
@@ -77,7 +76,6 @@ case class TungstenAggregate(
}
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
- val numInputRows = longMetric("numInputRows")
val numOutputRows = longMetric("numOutputRows")
val dataSize = longMetric("dataSize")
val spillSize = longMetric("spillSize")
@@ -102,7 +100,6 @@ case class TungstenAggregate(
child.output,
iter,
testFallbackStartsAt,
- numInputRows,
numOutputRows,
dataSize,
spillSize)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 001e9c306a..c4f6594835 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -85,7 +85,6 @@ class TungstenAggregationIterator(
originalInputAttributes: Seq[Attribute],
inputIter: Iterator[InternalRow],
testFallbackStartsAt: Option[Int],
- numInputRows: LongSQLMetric,
numOutputRows: LongSQLMetric,
dataSize: LongSQLMetric,
spillSize: LongSQLMetric)
@@ -179,14 +178,12 @@ class TungstenAggregationIterator(
val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
while (inputIter.hasNext) {
val newInput = inputIter.next()
- numInputRows += 1
processRow(buffer, newInput)
}
} else {
var i = 0
while (inputIter.hasNext) {
val newInput = inputIter.next()
- numInputRows += 1
val groupingKey = groupingProjection.apply(newInput)
var buffer: UnsafeRow = null
if (i < fallbackStartsAt) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index f63e8a9b6d..949acb9aca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -29,9 +29,6 @@ import org.apache.spark.util.random.PoissonSampler
case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
extends UnaryNode with CodegenSupport {
- override private[sql] lazy val metrics = Map(
- "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
-
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override def upstream(): RDD[InternalRow] = {
@@ -55,14 +52,10 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
}
protected override def doExecute(): RDD[InternalRow] = {
- val numRows = longMetric("numRows")
child.execute().mapPartitionsInternal { iter =>
val project = UnsafeProjection.create(projectList, child.output,
subexpressionEliminationEnabled)
- iter.map { row =>
- numRows += 1
- project(row)
- }
+ iter.map(project)
}
}
@@ -74,7 +67,6 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
override def output: Seq[Attribute] = child.output
private[sql] override lazy val metrics = Map(
- "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def upstream(): RDD[InternalRow] = {
@@ -104,12 +96,10 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
}
protected override def doExecute(): RDD[InternalRow] = {
- val numInputRows = longMetric("numInputRows")
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsInternal { iter =>
val predicate = newPredicate(condition, child.output)
iter.filter { row =>
- numInputRows += 1
val r = predicate(row)
if (r) numOutputRows += 1
r
@@ -135,9 +125,7 @@ case class Sample(
upperBound: Double,
withReplacement: Boolean,
seed: Long,
- child: SparkPlan)
- extends UnaryNode
-{
+ child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
protected override def doExecute(): RDD[InternalRow] = {
@@ -163,6 +151,9 @@ case class Range(
output: Seq[Attribute])
extends LeafNode with CodegenSupport {
+ private[sql] override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def upstream(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(0 until numSlices, numSlices).map(i => InternalRow(i))
}
@@ -241,6 +232,7 @@ case class Range(
}
protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
sqlContext
.sparkContext
.parallelize(0 until numSlices, numSlices)
@@ -283,6 +275,7 @@ case class Range(
overflow = true
}
+ numOutputRows += 1
unsafeRow.setLong(0, ret)
unsafeRow
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
index 9084b74d1a..4858140229 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.storage.StorageLevel
@@ -216,6 +217,9 @@ private[sql] case class InMemoryColumnarTableScan(
@transient relation: InMemoryRelation)
extends LeafNode {
+ private[sql] override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def output: Seq[Attribute] = attributes
// The cached version does not change the outputPartitioning of the original SparkPlan.
@@ -286,6 +290,8 @@ private[sql] case class InMemoryColumnarTableScan(
private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+
if (enableAccumulators) {
readPartitions.setValue(0)
readBatches.setValue(0)
@@ -332,12 +338,18 @@ private[sql] case class InMemoryColumnarTableScan(
cachedBatchIterator
}
+ // update SQL metrics
+ val withMetrics = cachedBatchesToScan.map { batch =>
+ numOutputRows += batch.numRows
+ batch
+ }
+
val columnTypes = requestedColumnDataTypes.map {
case udt: UserDefinedType[_] => udt.sqlType
case other => other
}.toArray
val columnarIterator = GenerateColumnAccessor.generate(columnTypes)
- columnarIterator.initialize(cachedBatchesToScan, columnTypes, requestedColumnIndices.toArray)
+ columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray)
if (enableAccumulators && columnarIterator.hasNext) {
readPartitions += 1
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index cbd549763a..35c7963b48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -48,8 +48,6 @@ case class BroadcastHashJoin(
extends BinaryNode with HashJoin with CodegenSupport {
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
val timeout: Duration = {
@@ -70,11 +68,6 @@ case class BroadcastHashJoin(
// for the same query.
@transient
private lazy val broadcastFuture = {
- val numBuildRows = buildSide match {
- case BuildLeft => longMetric("numLeftRows")
- case BuildRight => longMetric("numRightRows")
- }
-
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Future {
@@ -84,7 +77,6 @@ case class BroadcastHashJoin(
// Note that we use .execute().collect() because we don't want to convert data to Scala
// types
val input: Array[InternalRow] = buildPlan.execute().map { row =>
- numBuildRows += 1
row.copy()
}.collect()
// The following line doesn't run in a job so we cannot track the metric value. However, we
@@ -93,10 +85,10 @@ case class BroadcastHashJoin(
// TODO: move this check into HashedRelation
val hashed = if (canJoinKeyFitWithinLong) {
LongHashedRelation(
- input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size)
+ input.iterator, buildSideKeyGenerator, input.size)
} else {
HashedRelation(
- input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size)
+ input.iterator, buildSideKeyGenerator, input.size)
}
sparkContext.broadcast(hashed)
}
@@ -108,10 +100,6 @@ case class BroadcastHashJoin(
}
protected override def doExecute(): RDD[InternalRow] = {
- val numStreamedRows = buildSide match {
- case BuildLeft => longMetric("numRightRows")
- case BuildRight => longMetric("numLeftRows")
- }
val numOutputRows = longMetric("numOutputRows")
val broadcastRelation = Await.result(broadcastFuture, timeout)
@@ -119,7 +107,7 @@ case class BroadcastHashJoin(
streamedPlan.execute().mapPartitions { streamedIter =>
val hashedRelation = broadcastRelation.value
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
- hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
+ hashJoin(streamedIter, hashedRelation, numOutputRows)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index ad3275696e..5e8c8ca043 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -44,8 +44,6 @@ case class BroadcastHashOuterJoin(
right: SparkPlan) extends BinaryNode with HashOuterJoin {
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
val timeout = {
@@ -66,14 +64,6 @@ case class BroadcastHashOuterJoin(
// for the same query.
@transient
private lazy val broadcastFuture = {
- val numBuildRows = joinType match {
- case RightOuter => longMetric("numLeftRows")
- case LeftOuter => longMetric("numRightRows")
- case x =>
- throw new IllegalArgumentException(
- s"HashOuterJoin should not take $x as the JoinType")
- }
-
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Future {
@@ -83,14 +73,9 @@ case class BroadcastHashOuterJoin(
// Note that we use .execute().collect() because we don't want to convert data to Scala
// types
val input: Array[InternalRow] = buildPlan.execute().map { row =>
- numBuildRows += 1
row.copy()
}.collect()
- // The following line doesn't run in a job so we cannot track the metric value. However, we
- // have already tracked it in the above lines. So here we can use
- // `SQLMetrics.nullLongMetric` to ignore it.
- val hashed = HashedRelation(
- input.iterator, SQLMetrics.nullLongMetric, buildKeyGenerator, input.size)
+ val hashed = HashedRelation(input.iterator, buildKeyGenerator, input.size)
sparkContext.broadcast(hashed)
}
}(BroadcastHashJoin.broadcastHashJoinExecutionContext)
@@ -101,13 +86,6 @@ case class BroadcastHashOuterJoin(
}
override def doExecute(): RDD[InternalRow] = {
- val numStreamedRows = joinType match {
- case RightOuter => longMetric("numRightRows")
- case LeftOuter => longMetric("numLeftRows")
- case x =>
- throw new IllegalArgumentException(
- s"HashOuterJoin should not take $x as the JoinType")
- }
val numOutputRows = longMetric("numOutputRows")
val broadcastRelation = Await.result(broadcastFuture, timeout)
@@ -122,7 +100,6 @@ case class BroadcastHashOuterJoin(
joinType match {
case LeftOuter =>
streamedIter.flatMap(currentRow => {
- numStreamedRows += 1
val rowKey = keyGenerator(currentRow)
joinedRow.withLeft(currentRow)
leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj, numOutputRows)
@@ -130,7 +107,6 @@ case class BroadcastHashOuterJoin(
case RightOuter =>
streamedIter.flatMap(currentRow => {
- numStreamedRows += 1
val rowKey = keyGenerator(currentRow)
joinedRow.withRight(currentRow)
rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj, numOutputRows)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index d0e18dfcf3..4f1cfd2e81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -36,36 +36,31 @@ case class BroadcastLeftSemiJoinHash(
condition: Option[Expression]) extends BinaryNode with HashSemiJoin {
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
protected override def doExecute(): RDD[InternalRow] = {
- val numLeftRows = longMetric("numLeftRows")
- val numRightRows = longMetric("numRightRows")
val numOutputRows = longMetric("numOutputRows")
val input = right.execute().map { row =>
- numRightRows += 1
row.copy()
}.collect()
if (condition.isEmpty) {
- val hashSet = buildKeyHashSet(input.toIterator, SQLMetrics.nullLongMetric)
+ val hashSet = buildKeyHashSet(input.toIterator)
val broadcastedRelation = sparkContext.broadcast(hashSet)
left.execute().mapPartitionsInternal { streamIter =>
- hashSemiJoin(streamIter, numLeftRows, broadcastedRelation.value, numOutputRows)
+ hashSemiJoin(streamIter, broadcastedRelation.value, numOutputRows)
}
} else {
val hashRelation =
- HashedRelation(input.toIterator, SQLMetrics.nullLongMetric, rightKeyGenerator, input.size)
+ HashedRelation(input.toIterator, rightKeyGenerator, input.size)
val broadcastedRelation = sparkContext.broadcast(hashRelation)
left.execute().mapPartitionsInternal { streamIter =>
val hashedRelation = broadcastedRelation.value
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
- hashSemiJoin(streamIter, numLeftRows, hashedRelation, numOutputRows)
+ hashSemiJoin(streamIter, hashedRelation, numOutputRows)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index e55f869478..4585cbda92 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -36,8 +36,6 @@ case class BroadcastNestedLoopJoin(
// TODO: Override requiredChildDistribution.
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
/** BuildRight means the right relation <=> the broadcast relation. */
@@ -73,15 +71,10 @@ case class BroadcastNestedLoopJoin(
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
protected override def doExecute(): RDD[InternalRow] = {
- val (numStreamedRows, numBuildRows) = buildSide match {
- case BuildRight => (longMetric("numLeftRows"), longMetric("numRightRows"))
- case BuildLeft => (longMetric("numRightRows"), longMetric("numLeftRows"))
- }
val numOutputRows = longMetric("numOutputRows")
val broadcastedRelation =
sparkContext.broadcast(broadcast.execute().map { row =>
- numBuildRows += 1
row.copy()
}.collect().toIndexedSeq)
@@ -98,7 +91,6 @@ case class BroadcastNestedLoopJoin(
streamedIter.foreach { streamedRow =>
var i = 0
var streamRowMatched = false
- numStreamedRows += 1
while (i < broadcastedRelation.value.size) {
val broadcastedRow = broadcastedRelation.value(i)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 93d32e1fb9..e417079b61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -82,23 +82,13 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod
override def output: Seq[Attribute] = left.output ++ right.output
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
protected override def doExecute(): RDD[InternalRow] = {
- val numLeftRows = longMetric("numLeftRows")
- val numRightRows = longMetric("numRightRows")
val numOutputRows = longMetric("numOutputRows")
- val leftResults = left.execute().map { row =>
- numLeftRows += 1
- row.asInstanceOf[UnsafeRow]
- }
- val rightResults = right.execute().map { row =>
- numRightRows += 1
- row.asInstanceOf[UnsafeRow]
- }
+ val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]]
+ val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]]
val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size)
pair.mapPartitionsInternal { iter =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index ecbb1ac64b..332a748d3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -99,7 +99,6 @@ trait HashJoin {
protected def hashJoin(
streamIter: Iterator[InternalRow],
- numStreamRows: LongSQLMetric,
hashedRelation: HashedRelation,
numOutputRows: LongSQLMetric): Iterator[InternalRow] =
{
@@ -126,7 +125,6 @@ trait HashJoin {
// find the next match
while (currentHashMatches == null && streamIter.hasNext) {
currentStreamedRow = streamIter.next()
- numStreamRows += 1
val key = joinKeys(currentStreamedRow)
if (!key.anyNull) {
currentHashMatches = hashedRelation.get(key)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index 3e0f74cd98..0220e0b8a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -43,14 +43,13 @@ trait HashSemiJoin {
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
protected def buildKeyHashSet(
- buildIter: Iterator[InternalRow], numBuildRows: LongSQLMetric): java.util.Set[InternalRow] = {
+ buildIter: Iterator[InternalRow]): java.util.Set[InternalRow] = {
val hashSet = new java.util.HashSet[InternalRow]()
// Create a Hash set of buildKeys
val rightKey = rightKeyGenerator
while (buildIter.hasNext) {
val currentRow = buildIter.next()
- numBuildRows += 1
val rowKey = rightKey(currentRow)
if (!rowKey.anyNull) {
val keyExists = hashSet.contains(rowKey)
@@ -65,12 +64,10 @@ trait HashSemiJoin {
protected def hashSemiJoin(
streamIter: Iterator[InternalRow],
- numStreamRows: LongSQLMetric,
hashSet: java.util.Set[InternalRow],
numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
val joinKeys = leftKeyGenerator
streamIter.filter(current => {
- numStreamRows += 1
val key = joinKeys(current)
val r = !key.anyNull && hashSet.contains(key)
if (r) numOutputRows += 1
@@ -80,13 +77,11 @@ trait HashSemiJoin {
protected def hashSemiJoin(
streamIter: Iterator[InternalRow],
- numStreamRows: LongSQLMetric,
hashedRelation: HashedRelation,
numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
val joinKeys = leftKeyGenerator
val joinedRow = new JoinedRow
streamIter.filter { current =>
- numStreamRows += 1
val key = joinKeys(current)
lazy val rowBuffer = hashedRelation.get(key)
val r = !key.anyNull && rowBuffer != null && rowBuffer.exists {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index eb6930a14f..0978570d42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -159,18 +159,17 @@ private[joins] class UniqueKeyHashedRelation(
private[execution] object HashedRelation {
def apply(localNode: LocalNode, keyGenerator: Projection): HashedRelation = {
- apply(localNode.asIterator, SQLMetrics.nullLongMetric, keyGenerator)
+ apply(localNode.asIterator, keyGenerator)
}
def apply(
input: Iterator[InternalRow],
- numInputRows: LongSQLMetric,
keyGenerator: Projection,
sizeEstimate: Int = 64): HashedRelation = {
if (keyGenerator.isInstanceOf[UnsafeProjection]) {
return UnsafeHashedRelation(
- input, numInputRows, keyGenerator.asInstanceOf[UnsafeProjection], sizeEstimate)
+ input, keyGenerator.asInstanceOf[UnsafeProjection], sizeEstimate)
}
// TODO: Use Spark's HashMap implementation.
@@ -184,7 +183,6 @@ private[execution] object HashedRelation {
// Create a mapping of buildKeys -> rows
while (input.hasNext) {
currentRow = input.next()
- numInputRows += 1
val rowKey = keyGenerator(currentRow)
if (!rowKey.anyNull) {
val existingMatchList = hashTable.get(rowKey)
@@ -427,7 +425,6 @@ private[joins] object UnsafeHashedRelation {
def apply(
input: Iterator[InternalRow],
- numInputRows: LongSQLMetric,
keyGenerator: UnsafeProjection,
sizeEstimate: Int): HashedRelation = {
@@ -437,7 +434,6 @@ private[joins] object UnsafeHashedRelation {
// Create a mapping of buildKeys -> rows
while (input.hasNext) {
val unsafeRow = input.next().asInstanceOf[UnsafeRow]
- numInputRows += 1
val rowKey = keyGenerator(unsafeRow)
if (!rowKey.anyNull) {
val existingMatchList = hashTable.get(rowKey)
@@ -604,7 +600,6 @@ private[joins] object LongHashedRelation {
def apply(
input: Iterator[InternalRow],
- numInputRows: LongSQLMetric,
keyGenerator: Projection,
sizeEstimate: Int): HashedRelation = {
@@ -619,7 +614,6 @@ private[joins] object LongHashedRelation {
while (input.hasNext) {
val unsafeRow = input.next().asInstanceOf[UnsafeRow]
numFields = unsafeRow.numFields()
- numInputRows += 1
val rowKey = keyGenerator(unsafeRow)
if (!rowKey.anyNull) {
val key = rowKey.getLong(0)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
index 82498ee395..ce758d63b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
@@ -34,8 +34,6 @@ case class LeftSemiJoinBNL(
// TODO: Override requiredChildDistribution.
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def outputPartitioning: Partitioning = streamed.outputPartitioning
@@ -52,13 +50,10 @@ case class LeftSemiJoinBNL(
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
protected override def doExecute(): RDD[InternalRow] = {
- val numLeftRows = longMetric("numLeftRows")
- val numRightRows = longMetric("numRightRows")
val numOutputRows = longMetric("numOutputRows")
val broadcastedRelation =
sparkContext.broadcast(broadcast.execute().map { row =>
- numRightRows += 1
row.copy()
}.collect().toIndexedSeq)
@@ -66,7 +61,6 @@ case class LeftSemiJoinBNL(
val joinedRow = new JoinedRow
streamedIter.filter(streamedRow => {
- numLeftRows += 1
var i = 0
var matched = false
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
index 25b3b5ca23..d8d3045ccf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
@@ -36,8 +36,6 @@ case class LeftSemiJoinHash(
condition: Option[Expression]) extends BinaryNode with HashSemiJoin {
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def outputPartitioning: Partitioning = left.outputPartitioning
@@ -46,17 +44,15 @@ case class LeftSemiJoinHash(
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
protected override def doExecute(): RDD[InternalRow] = {
- val numLeftRows = longMetric("numLeftRows")
- val numRightRows = longMetric("numRightRows")
val numOutputRows = longMetric("numOutputRows")
right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) =>
if (condition.isEmpty) {
- val hashSet = buildKeyHashSet(buildIter, numRightRows)
- hashSemiJoin(streamIter, numLeftRows, hashSet, numOutputRows)
+ val hashSet = buildKeyHashSet(buildIter)
+ hashSemiJoin(streamIter, hashSet, numOutputRows)
} else {
- val hashRelation = HashedRelation(buildIter, numRightRows, rightKeyGenerator)
- hashSemiJoin(streamIter, numLeftRows, hashRelation, numOutputRows)
+ val hashRelation = HashedRelation(buildIter, rightKeyGenerator)
+ hashSemiJoin(streamIter, hashRelation, numOutputRows)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 322a954b4f..cd8a5670e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -37,8 +37,6 @@ case class SortMergeJoin(
right: SparkPlan) extends BinaryNode {
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def output: Seq[Attribute] = left.output ++ right.output
@@ -60,8 +58,6 @@ case class SortMergeJoin(
}
protected override def doExecute(): RDD[InternalRow] = {
- val numLeftRows = longMetric("numLeftRows")
- val numRightRows = longMetric("numRightRows")
val numOutputRows = longMetric("numOutputRows")
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
@@ -89,9 +85,7 @@ case class SortMergeJoin(
rightKeyGenerator,
keyOrdering,
RowIterator.fromScala(leftIter),
- numLeftRows,
- RowIterator.fromScala(rightIter),
- numRightRows
+ RowIterator.fromScala(rightIter)
)
private[this] val joinRow = new JoinedRow
private[this] val resultProjection: (InternalRow) => InternalRow =
@@ -157,9 +151,7 @@ private[joins] class SortMergeJoinScanner(
bufferedKeyGenerator: Projection,
keyOrdering: Ordering[InternalRow],
streamedIter: RowIterator,
- numStreamedRows: LongSQLMetric,
- bufferedIter: RowIterator,
- numBufferedRows: LongSQLMetric) {
+ bufferedIter: RowIterator) {
private[this] var streamedRow: InternalRow = _
private[this] var streamedRowKey: InternalRow = _
private[this] var bufferedRow: InternalRow = _
@@ -284,7 +276,6 @@ private[joins] class SortMergeJoinScanner(
if (streamedIter.advanceNext()) {
streamedRow = streamedIter.getRow
streamedRowKey = streamedKeyGenerator(streamedRow)
- numStreamedRows += 1
true
} else {
streamedRow = null
@@ -302,7 +293,6 @@ private[joins] class SortMergeJoinScanner(
while (!foundRow && bufferedIter.advanceNext()) {
bufferedRow = bufferedIter.getRow
bufferedRowKey = bufferedKeyGenerator(bufferedRow)
- numBufferedRows += 1
foundRow = !bufferedRowKey.anyNull
}
if (!foundRow) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
index ed41ad2a00..40a6c93b59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
@@ -40,8 +40,6 @@ case class SortMergeOuterJoin(
right: SparkPlan) extends BinaryNode {
override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def output: Seq[Attribute] = {
@@ -96,8 +94,6 @@ case class SortMergeOuterJoin(
UnsafeProjection.create(rightKeys, right.output)
override def doExecute(): RDD[InternalRow] = {
- val numLeftRows = longMetric("numLeftRows")
- val numRightRows = longMetric("numRightRows")
val numOutputRows = longMetric("numOutputRows")
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
@@ -119,9 +115,7 @@ case class SortMergeOuterJoin(
bufferedKeyGenerator = createRightKeyGenerator(),
keyOrdering,
streamedIter = RowIterator.fromScala(leftIter),
- numLeftRows,
- bufferedIter = RowIterator.fromScala(rightIter),
- numRightRows
+ bufferedIter = RowIterator.fromScala(rightIter)
)
val rightNullRow = new GenericInternalRow(right.output.length)
new LeftOuterIterator(
@@ -133,9 +127,7 @@ case class SortMergeOuterJoin(
bufferedKeyGenerator = createLeftKeyGenerator(),
keyOrdering,
streamedIter = RowIterator.fromScala(rightIter),
- numRightRows,
- bufferedIter = RowIterator.fromScala(leftIter),
- numLeftRows
+ bufferedIter = RowIterator.fromScala(leftIter)
)
val leftNullRow = new GenericInternalRow(left.output.length)
new RightOuterIterator(
@@ -149,9 +141,7 @@ case class SortMergeOuterJoin(
rightKeyGenerator = createRightKeyGenerator(),
keyOrdering,
leftIter = RowIterator.fromScala(leftIter),
- numLeftRows,
rightIter = RowIterator.fromScala(rightIter),
- numRightRows,
boundCondition,
leftNullRow,
rightNullRow)
@@ -289,9 +279,7 @@ private class SortMergeFullOuterJoinScanner(
rightKeyGenerator: Projection,
keyOrdering: Ordering[InternalRow],
leftIter: RowIterator,
- numLeftRows: LongSQLMetric,
rightIter: RowIterator,
- numRightRows: LongSQLMetric,
boundCondition: InternalRow => Boolean,
leftNullRow: InternalRow,
rightNullRow: InternalRow) {
@@ -321,7 +309,6 @@ private class SortMergeFullOuterJoinScanner(
if (leftIter.advanceNext()) {
leftRow = leftIter.getRow
leftRowKey = leftKeyGenerator(leftRow)
- numLeftRows += 1
true
} else {
leftRow = null
@@ -338,7 +325,6 @@ private class SortMergeFullOuterJoinScanner(
if (rightIter.advanceNext()) {
rightRow = rightIter.getRow
rightRowKey = rightKeyGenerator(rightRow)
- numRightRows += 1
true
} else {
rightRow = null
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index f985dfbd8a..04dd809df1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -36,8 +36,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
test("GeneralHashedRelation") {
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
- val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data")
- val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
+ val hashed = HashedRelation(data.iterator, keyProjection)
assert(hashed.isInstanceOf[GeneralHashedRelation])
assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0)))
@@ -47,13 +46,11 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
val data2 = CompactBuffer[InternalRow](data(2))
data2 += data(2)
assert(hashed.get(data(2)) === data2)
- assert(numDataRows.value.value === data.length)
}
test("UniqueKeyHashedRelation") {
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2))
- val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data")
- val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
+ val hashed = HashedRelation(data.iterator, keyProjection)
assert(hashed.isInstanceOf[UniqueKeyHashedRelation])
assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0)))
@@ -66,19 +63,17 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
assert(uniqHashed.getValue(data(1)) === data(1))
assert(uniqHashed.getValue(data(2)) === data(2))
assert(uniqHashed.getValue(InternalRow(10)) === null)
- assert(numDataRows.value.value === data.length)
}
test("UnsafeHashedRelation") {
val schema = StructType(StructField("a", IntegerType, true) :: Nil)
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
- val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data")
val toUnsafe = UnsafeProjection.create(schema)
val unsafeData = data.map(toUnsafe(_).copy()).toArray
val buildKey = Seq(BoundReference(0, IntegerType, false))
val keyGenerator = UnsafeProjection.create(buildKey)
- val hashed = UnsafeHashedRelation(unsafeData.iterator, numDataRows, keyGenerator, 1)
+ val hashed = UnsafeHashedRelation(unsafeData.iterator, keyGenerator, 1)
assert(hashed.isInstanceOf[UnsafeHashedRelation])
assert(hashed.get(unsafeData(0)) === CompactBuffer[InternalRow](unsafeData(0)))
@@ -100,7 +95,6 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
assert(hashed2.get(unsafeData(1)) === CompactBuffer[InternalRow](unsafeData(1)))
assert(hashed2.get(toUnsafe(InternalRow(10))) === null)
assert(hashed2.get(unsafeData(2)) === data2)
- assert(numDataRows.value.value === data.length)
val os2 = new ByteArrayOutputStream()
val out2 = new ObjectOutputStream(os2)
@@ -139,7 +133,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
Seq(BoundReference(0, IntegerType, false), BoundReference(1, IntegerType, true)))
val rows = (0 until 100).map(i => unsafeProj(InternalRow(i, i + 1)).copy())
val keyProj = UnsafeProjection.create(Seq(BoundReference(0, IntegerType, false)))
- val longRelation = LongHashedRelation(rows.iterator, SQLMetrics.nullLongMetric, keyProj, 100)
+ val longRelation = LongHashedRelation(rows.iterator, keyProj, 100)
assert(longRelation.isInstanceOf[LongArrayRelation])
val longArrayRelation = longRelation.asInstanceOf[LongArrayRelation]
(0 until 100).foreach { i =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 2260e48702..d24625a535 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -113,23 +113,12 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
}
}
- test("Project metrics") {
- // Assume the execution plan is
- // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
- val df = person.select('name)
- testSparkPlanMetrics(df, 1, Map(
- 0L -> ("Project", Map(
- "number of rows" -> 2L)))
- )
- }
-
test("Filter metrics") {
// Assume the execution plan is
// PhysicalRDD(nodeId = 1) -> Filter(nodeId = 0)
val df = person.filter('age < 25)
testSparkPlanMetrics(df, 1, Map(
0L -> ("Filter", Map(
- "number of input rows" -> 2L,
"number of output rows" -> 1L)))
)
}
@@ -149,10 +138,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = testData2.groupBy().count() // 2 partitions
testSparkPlanMetrics(df, 1, Map(
2L -> ("TungstenAggregate", Map(
- "number of input rows" -> 6L,
"number of output rows" -> 2L)),
0L -> ("TungstenAggregate", Map(
- "number of input rows" -> 2L,
"number of output rows" -> 1L)))
)
@@ -160,10 +147,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df2 = testData2.groupBy('a).count()
testSparkPlanMetrics(df2, 1, Map(
2L -> ("TungstenAggregate", Map(
- "number of input rows" -> 6L,
"number of output rows" -> 4L)),
0L -> ("TungstenAggregate", Map(
- "number of input rows" -> 4L,
"number of output rows" -> 3L)))
)
}
@@ -181,8 +166,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
testSparkPlanMetrics(df, 1, Map(
1L -> ("SortMergeJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
- "number of left rows" -> 4L,
- "number of right rows" -> 2L,
"number of output rows" -> 4L)))
)
}
@@ -201,8 +184,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
testSparkPlanMetrics(df, 1, Map(
1L -> ("SortMergeOuterJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
- "number of left rows" -> 6L,
- "number of right rows" -> 2L,
"number of output rows" -> 8L)))
)
@@ -211,8 +192,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
testSparkPlanMetrics(df2, 1, Map(
1L -> ("SortMergeOuterJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
- "number of left rows" -> 2L,
- "number of right rows" -> 6L,
"number of output rows" -> 8L)))
)
}
@@ -226,8 +205,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = df1.join(broadcast(df2), "key")
testSparkPlanMetrics(df, 2, Map(
1L -> ("BroadcastHashJoin", Map(
- "number of left rows" -> 2L,
- "number of right rows" -> 4L,
"number of output rows" -> 2L)))
)
}
@@ -240,16 +217,12 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = df1.join(broadcast(df2), $"key" === $"key2", "left_outer")
testSparkPlanMetrics(df, 2, Map(
0L -> ("BroadcastHashOuterJoin", Map(
- "number of left rows" -> 3L,
- "number of right rows" -> 4L,
"number of output rows" -> 5L)))
)
val df3 = df1.join(broadcast(df2), $"key" === $"key2", "right_outer")
testSparkPlanMetrics(df3, 2, Map(
0L -> ("BroadcastHashOuterJoin", Map(
- "number of left rows" -> 3L,
- "number of right rows" -> 4L,
"number of output rows" -> 6L)))
)
}
@@ -265,8 +238,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
"testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a")
testSparkPlanMetrics(df, 3, Map(
1L -> ("BroadcastNestedLoopJoin", Map(
- "number of left rows" -> 12L, // left needs to be scanned twice
- "number of right rows" -> 2L,
"number of output rows" -> 12L)))
)
}
@@ -280,8 +251,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
testSparkPlanMetrics(df, 2, Map(
0L -> ("BroadcastLeftSemiJoinHash", Map(
- "number of left rows" -> 2L,
- "number of right rows" -> 4L,
"number of output rows" -> 2L)))
)
}
@@ -295,8 +264,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = df1.join(df2, $"key" === $"key2", "leftsemi")
testSparkPlanMetrics(df, 1, Map(
0L -> ("LeftSemiJoinHash", Map(
- "number of left rows" -> 2L,
- "number of right rows" -> 4L,
"number of output rows" -> 2L)))
)
}
@@ -310,8 +277,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = df1.join(df2, $"key" < $"key2", "leftsemi")
testSparkPlanMetrics(df, 2, Map(
0L -> ("LeftSemiJoinBNL", Map(
- "number of left rows" -> 2L,
- "number of right rows" -> 4L,
"number of output rows" -> 2L)))
)
}
@@ -326,8 +291,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
"SELECT * FROM testData2 JOIN testDataForJoin")
testSparkPlanMetrics(df, 1, Map(
1L -> ("CartesianProduct", Map(
- "number of left rows" -> 12L, // left needs to be scanned twice
- "number of right rows" -> 4L, // right is read twice
"number of output rows" -> 12L)))
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index a3e5243b68..d3191d3aea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -92,7 +92,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
- metrics += qe.executedPlan.longMetric("numInputRows").value.value
+ metrics += qe.executedPlan.longMetric("numOutputRows").value.value
}
}
sqlContext.listenerManager.register(listener)
@@ -105,9 +105,9 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
}
assert(metrics.length == 3)
- assert(metrics(0) == 1)
- assert(metrics(1) == 1)
- assert(metrics(2) == 2)
+ assert(metrics(0) === 1)
+ assert(metrics(1) === 1)
+ assert(metrics(2) === 2)
sqlContext.listenerManager.unregister(listener)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index eff8833e92..235b80b7c6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.hive._
import org.apache.spark.sql.types.{BooleanType, DataType}
import org.apache.spark.util.Utils
@@ -52,6 +53,9 @@ case class HiveTableScan(
require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
"Partition pruning predicates only supported for partitioned tables.")
+ private[sql] override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def producedAttributes: AttributeSet = outputSet ++
AttributeSet(partitionPruningPred.flatMap(_.references))
@@ -146,9 +150,13 @@ case class HiveTableScan(
prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
}
}
+ val numOutputRows = longMetric("numOutputRows")
rdd.mapPartitionsInternal { iter =>
val proj = UnsafeProjection.create(schema)
- iter.map(proj)
+ iter.map { r =>
+ numOutputRows += 1
+ proj(r)
+ }
}
}