aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala40
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala38
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala450
27 files changed, 847 insertions, 107 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index e8c6a0f8f8..f3b6a3a5f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -45,6 +46,10 @@ case class Aggregate(
child: SparkPlan)
extends UnaryNode {
+ override private[sql] lazy val metrics = Map(
+ "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def requiredChildDistribution: List[Distribution] = {
if (partial) {
UnspecifiedDistribution :: Nil
@@ -121,12 +126,15 @@ case class Aggregate(
}
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
+ val numInputRows = longMetric("numInputRows")
+ val numOutputRows = longMetric("numOutputRows")
if (groupingExpressions.isEmpty) {
child.execute().mapPartitions { iter =>
val buffer = newAggregateBuffer()
var currentRow: InternalRow = null
while (iter.hasNext) {
currentRow = iter.next()
+ numInputRows += 1
var i = 0
while (i < buffer.length) {
buffer(i).update(currentRow)
@@ -142,6 +150,7 @@ case class Aggregate(
i += 1
}
+ numOutputRows += 1
Iterator(resultProjection(aggregateResults))
}
} else {
@@ -152,6 +161,7 @@ case class Aggregate(
var currentRow: InternalRow = null
while (iter.hasNext) {
currentRow = iter.next()
+ numInputRows += 1
val currentGroup = groupingProjection(currentRow)
var currentBuffer = hashTable.get(currentGroup)
if (currentBuffer == null) {
@@ -180,6 +190,7 @@ case class Aggregate(
val currentEntry = hashTableIter.next()
val currentGroup = currentEntry.getKey
val currentBuffer = currentEntry.getValue
+ numOutputRows += 1
var i = 0
while (i < currentBuffer.length) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index cae7ca5cbd..abb60cf12e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -99,8 +99,6 @@ private[sql] case class PhysicalRDD(
rdd: RDD[InternalRow],
extraInformation: String) extends LeafNode {
- override protected[sql] val trackNumOfRowsEnabled = true
-
protected override def doExecute(): RDD[InternalRow] = rdd
override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index 858dd85fd1..34e926e458 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -30,8 +30,6 @@ private[sql] case class LocalTableScan(
output: Seq[Attribute],
rows: Seq[InternalRow]) extends LeafNode {
- override protected[sql] val trackNumOfRowsEnabled = true
-
private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
protected override def doExecute(): RDD[InternalRow] = rdd
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 9ba5cf2d2b..72f5450510 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -81,22 +81,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
/**
- * Whether track the number of rows output by this SparkPlan
- */
- protected[sql] def trackNumOfRowsEnabled: Boolean = false
-
- private lazy val defaultMetrics: Map[String, SQLMetric[_, _]] =
- if (trackNumOfRowsEnabled) {
- Map("numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
- }
- else {
- Map.empty
- }
-
- /**
* Return all metrics containing metrics of this SparkPlan.
*/
- private[sql] def metrics: Map[String, SQLMetric[_, _]] = defaultMetrics
+ private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty
/**
* Return a LongSQLMetric according to the name.
@@ -150,15 +137,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
- if (trackNumOfRowsEnabled) {
- val numRows = longMetric("numRows")
- doExecute().map { row =>
- numRows += 1
- row
- }
- } else {
- doExecute()
- }
+ doExecute()
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
index ad428ad663..ab26f9c58a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, ClusteredDistribution, AllTuples, Distribution}
import org.apache.spark.sql.execution.{UnsafeFixedWidthAggregationMap, SparkPlan, UnaryNode}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.StructType
case class SortBasedAggregate(
@@ -38,6 +39,10 @@ case class SortBasedAggregate(
child: SparkPlan)
extends UnaryNode {
+ override private[sql] lazy val metrics = Map(
+ "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def outputsUnsafeRows: Boolean = false
override def canProcessUnsafeRows: Boolean = false
@@ -63,6 +68,8 @@ case class SortBasedAggregate(
}
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
+ val numInputRows = longMetric("numInputRows")
+ val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitions { iter =>
// Because the constructor of an aggregation iterator will read at least the first row,
// we need to get the value of iter.hasNext first.
@@ -84,10 +91,13 @@ case class SortBasedAggregate(
newProjection _,
child.output,
iter,
- outputsUnsafeRows)
+ outputsUnsafeRows,
+ numInputRows,
+ numOutputRows)
if (!hasInput && groupingExpressions.isEmpty) {
// There is no input and there is no grouping expressions.
// We need to output a single row as the output.
+ numOutputRows += 1
Iterator[InternalRow](outputIter.outputForEmptyGroupingKeyWithoutInput())
} else {
outputIter
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index 67ebafde25..73d50e07cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression2, AggregateFunction2}
+import org.apache.spark.sql.execution.metric.LongSQLMetric
import org.apache.spark.unsafe.KVIterator
/**
@@ -37,7 +38,9 @@ class SortBasedAggregationIterator(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection),
- outputsUnsafeRows: Boolean)
+ outputsUnsafeRows: Boolean,
+ numInputRows: LongSQLMetric,
+ numOutputRows: LongSQLMetric)
extends AggregationIterator(
groupingKeyAttributes,
valueAttributes,
@@ -103,6 +106,7 @@ class SortBasedAggregationIterator(
// Get the grouping key.
val groupingKey = inputKVIterator.getKey
val currentRow = inputKVIterator.getValue
+ numInputRows += 1
// Check if the current row belongs the current input row.
if (currentGroupingKey == groupingKey) {
@@ -137,7 +141,7 @@ class SortBasedAggregationIterator(
val outputRow = generateOutput(currentGroupingKey, sortBasedAggregationBuffer)
// Initialize buffer values for the next group.
initializeBuffer(sortBasedAggregationBuffer)
-
+ numOutputRows += 1
outputRow
} else {
// no more result
@@ -151,7 +155,7 @@ class SortBasedAggregationIterator(
nextGroupingKey = inputKVIterator.getKey().copy()
firstRowInNextGroup = inputKVIterator.getValue().copy()
-
+ numInputRows += 1
sortedInputHasNewGroup = true
} else {
// This inputIter is empty.
@@ -181,7 +185,9 @@ object SortBasedAggregationIterator {
newProjection: (Seq[Expression], Seq[Attribute]) => Projection,
inputAttributes: Seq[Attribute],
inputIter: Iterator[InternalRow],
- outputsUnsafeRows: Boolean): SortBasedAggregationIterator = {
+ outputsUnsafeRows: Boolean,
+ numInputRows: LongSQLMetric,
+ numOutputRows: LongSQLMetric): SortBasedAggregationIterator = {
val kvIterator = if (UnsafeProjection.canSupport(groupingExprs)) {
AggregationIterator.unsafeKVIterator(
groupingExprs,
@@ -202,7 +208,9 @@ object SortBasedAggregationIterator {
initialInputBufferOffset,
resultExpressions,
newMutableProjection,
- outputsUnsafeRows)
+ outputsUnsafeRows,
+ numInputRows,
+ numOutputRows)
}
// scalastyle:on
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 1694794a53..6b5935a7ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, ClusteredDistribution, AllTuples, Distribution}
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
case class TungstenAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]],
@@ -35,6 +36,10 @@ case class TungstenAggregate(
child: SparkPlan)
extends UnaryNode {
+ override private[sql] lazy val metrics = Map(
+ "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
@@ -61,6 +66,8 @@ case class TungstenAggregate(
}
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
+ val numInputRows = longMetric("numInputRows")
+ val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitions { iter =>
val hasInput = iter.hasNext
if (!hasInput && groupingExpressions.nonEmpty) {
@@ -78,9 +85,12 @@ case class TungstenAggregate(
newMutableProjection,
child.output,
iter,
- testFallbackStartsAt)
+ testFallbackStartsAt,
+ numInputRows,
+ numOutputRows)
if (!hasInput && groupingExpressions.isEmpty) {
+ numOutputRows += 1
Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput())
} else {
aggregationIterator
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 32160906c3..1f383dd044 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{UnsafeKVExternalSorter, UnsafeFixedWidthAggregationMap}
+import org.apache.spark.sql.execution.metric.LongSQLMetric
import org.apache.spark.sql.types.StructType
/**
@@ -83,7 +84,9 @@ class TungstenAggregationIterator(
newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection),
originalInputAttributes: Seq[Attribute],
inputIter: Iterator[InternalRow],
- testFallbackStartsAt: Option[Int])
+ testFallbackStartsAt: Option[Int],
+ numInputRows: LongSQLMetric,
+ numOutputRows: LongSQLMetric)
extends Iterator[UnsafeRow] with Logging {
///////////////////////////////////////////////////////////////////////////
@@ -352,6 +355,7 @@ class TungstenAggregationIterator(
private def processInputs(): Unit = {
while (!sortBased && inputIter.hasNext) {
val newInput = inputIter.next()
+ numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
if (buffer == null) {
@@ -371,6 +375,7 @@ class TungstenAggregationIterator(
var i = 0
while (!sortBased && inputIter.hasNext) {
val newInput = inputIter.next()
+ numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
val buffer: UnsafeRow = if (i < fallbackStartsAt) {
hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
@@ -439,6 +444,7 @@ class TungstenAggregationIterator(
// Process the rest of input rows.
while (inputIter.hasNext) {
val newInput = inputIter.next()
+ numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
buffer.copyFrom(initialAggregationBuffer)
processRow(buffer, newInput)
@@ -462,6 +468,7 @@ class TungstenAggregationIterator(
// Insert the rest of input rows.
while (inputIter.hasNext) {
val newInput = inputIter.next()
+ numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
bufferExtractor(newInput)
externalSorter.insertKV(groupingKey, buffer)
@@ -657,7 +664,7 @@ class TungstenAggregationIterator(
TaskContext.get().internalMetricsToAccumulators(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemory)
}
-
+ numOutputRows += 1
res
} else {
// no more result
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index bf2de244c8..247c900baa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -41,11 +41,20 @@ import org.apache.spark.{HashPartitioner, SparkEnv}
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+ override private[sql] lazy val metrics = Map(
+ "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
+
@transient lazy val buildProjection = newMutableProjection(projectList, child.output)
- protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
- val reusableProjection = buildProjection()
- iter.map(reusableProjection)
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numRows = longMetric("numRows")
+ child.execute().mapPartitions { iter =>
+ val reusableProjection = buildProjection()
+ iter.map { row =>
+ numRows += 1
+ reusableProjection(row)
+ }
+ }
}
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
@@ -57,19 +66,28 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
*/
case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
+ override private[sql] lazy val metrics = Map(
+ "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
+
override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
- protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
- this.transformAllExpressions {
- case CreateStruct(children) => CreateStructUnsafe(children)
- case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numRows = longMetric("numRows")
+ child.execute().mapPartitions { iter =>
+ this.transformAllExpressions {
+ case CreateStruct(children) => CreateStructUnsafe(children)
+ case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
+ }
+ val project = UnsafeProjection.create(projectList, child.output)
+ iter.map { row =>
+ numRows += 1
+ project(row)
+ }
}
- val project = UnsafeProjection.create(projectList, child.output)
- iter.map(project)
}
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index f7a68e4f5d..2e108cb814 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.ThreadUtils
import org.apache.spark.{InternalAccumulator, TaskContext}
@@ -45,7 +46,10 @@ case class BroadcastHashJoin(
right: SparkPlan)
extends BinaryNode with HashJoin {
- override protected[sql] val trackNumOfRowsEnabled = true
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
val timeout: Duration = {
val timeoutValue = sqlContext.conf.broadcastTimeout
@@ -65,6 +69,11 @@ case class BroadcastHashJoin(
// for the same query.
@transient
private lazy val broadcastFuture = {
+ val numBuildRows = buildSide match {
+ case BuildLeft => longMetric("numLeftRows")
+ case BuildRight => longMetric("numRightRows")
+ }
+
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
future {
@@ -73,8 +82,15 @@ case class BroadcastHashJoin(
SQLExecution.withExecutionId(sparkContext, executionId) {
// Note that we use .execute().collect() because we don't want to convert data to Scala
// types
- val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
- val hashed = HashedRelation(input.iterator, buildSideKeyGenerator, input.size)
+ val input: Array[InternalRow] = buildPlan.execute().map { row =>
+ numBuildRows += 1
+ row.copy()
+ }.collect()
+ // The following line doesn't run in a job so we cannot track the metric value. However, we
+ // have already tracked it in the above lines. So here we can use
+ // `SQLMetrics.nullLongMetric` to ignore it.
+ val hashed = HashedRelation(
+ input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size)
sparkContext.broadcast(hashed)
}
}(BroadcastHashJoin.broadcastHashJoinExecutionContext)
@@ -85,6 +101,12 @@ case class BroadcastHashJoin(
}
protected override def doExecute(): RDD[InternalRow] = {
+ val numStreamedRows = buildSide match {
+ case BuildLeft => longMetric("numRightRows")
+ case BuildRight => longMetric("numLeftRows")
+ }
+ val numOutputRows = longMetric("numOutputRows")
+
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
@@ -95,7 +117,7 @@ case class BroadcastHashJoin(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
case _ =>
}
- hashJoin(streamedIter, hashedRelation)
+ hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index a3626de49a..69a8b95eaa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.{InternalAccumulator, TaskContext}
/**
@@ -45,6 +46,11 @@ case class BroadcastHashOuterJoin(
left: SparkPlan,
right: SparkPlan) extends BinaryNode with HashOuterJoin {
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
val timeout = {
val timeoutValue = sqlContext.conf.broadcastTimeout
if (timeoutValue < 0) {
@@ -63,6 +69,14 @@ case class BroadcastHashOuterJoin(
// for the same query.
@transient
private lazy val broadcastFuture = {
+ val numBuildRows = joinType match {
+ case RightOuter => longMetric("numLeftRows")
+ case LeftOuter => longMetric("numRightRows")
+ case x =>
+ throw new IllegalArgumentException(
+ s"HashOuterJoin should not take $x as the JoinType")
+ }
+
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
future {
@@ -71,8 +85,15 @@ case class BroadcastHashOuterJoin(
SQLExecution.withExecutionId(sparkContext, executionId) {
// Note that we use .execute().collect() because we don't want to convert data to Scala
// types
- val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
- val hashed = HashedRelation(input.iterator, buildKeyGenerator, input.size)
+ val input: Array[InternalRow] = buildPlan.execute().map { row =>
+ numBuildRows += 1
+ row.copy()
+ }.collect()
+ // The following line doesn't run in a job so we cannot track the metric value. However, we
+ // have already tracked it in the above lines. So here we can use
+ // `SQLMetrics.nullLongMetric` to ignore it.
+ val hashed = HashedRelation(
+ input.iterator, SQLMetrics.nullLongMetric, buildKeyGenerator, input.size)
sparkContext.broadcast(hashed)
}
}(BroadcastHashJoin.broadcastHashJoinExecutionContext)
@@ -83,6 +104,15 @@ case class BroadcastHashOuterJoin(
}
override def doExecute(): RDD[InternalRow] = {
+ val numStreamedRows = joinType match {
+ case RightOuter => longMetric("numRightRows")
+ case LeftOuter => longMetric("numLeftRows")
+ case x =>
+ throw new IllegalArgumentException(
+ s"HashOuterJoin should not take $x as the JoinType")
+ }
+ val numOutputRows = longMetric("numOutputRows")
+
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
@@ -101,16 +131,18 @@ case class BroadcastHashOuterJoin(
joinType match {
case LeftOuter =>
streamedIter.flatMap(currentRow => {
+ numStreamedRows += 1
val rowKey = keyGenerator(currentRow)
joinedRow.withLeft(currentRow)
- leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj)
+ leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj, numOutputRows)
})
case RightOuter =>
streamedIter.flatMap(currentRow => {
+ numStreamedRows += 1
val rowKey = keyGenerator(currentRow)
joinedRow.withRight(currentRow)
- rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj)
+ rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj, numOutputRows)
})
case x =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index 5bd06fbdca..78a8c16c62 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -37,18 +38,31 @@ case class BroadcastLeftSemiJoinHash(
right: SparkPlan,
condition: Option[Expression]) extends BinaryNode with HashSemiJoin {
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
protected override def doExecute(): RDD[InternalRow] = {
- val input = right.execute().map(_.copy()).collect()
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
+ val input = right.execute().map { row =>
+ numRightRows += 1
+ row.copy()
+ }.collect()
if (condition.isEmpty) {
- val hashSet = buildKeyHashSet(input.toIterator)
+ val hashSet = buildKeyHashSet(input.toIterator, SQLMetrics.nullLongMetric)
val broadcastedRelation = sparkContext.broadcast(hashSet)
left.execute().mapPartitions { streamIter =>
- hashSemiJoin(streamIter, broadcastedRelation.value)
+ hashSemiJoin(streamIter, numLeftRows, broadcastedRelation.value, numOutputRows)
}
} else {
- val hashRelation = HashedRelation(input.toIterator, rightKeyGenerator, input.size)
+ val hashRelation =
+ HashedRelation(input.toIterator, SQLMetrics.nullLongMetric, rightKeyGenerator, input.size)
val broadcastedRelation = sparkContext.broadcast(hashRelation)
left.execute().mapPartitions { streamIter =>
@@ -59,7 +73,7 @@ case class BroadcastLeftSemiJoinHash(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
case _ =>
}
- hashSemiJoin(streamIter, hashedRelation)
+ hashSemiJoin(streamIter, numLeftRows, hashedRelation, numOutputRows)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index 017a44b9ca..28c88b1b03 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.collection.CompactBuffer
/**
@@ -38,6 +39,11 @@ case class BroadcastNestedLoopJoin(
condition: Option[Expression]) extends BinaryNode {
// TODO: Override requiredChildDistribution.
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
/** BuildRight means the right relation <=> the broadcast relation. */
private val (streamed, broadcast) = buildSide match {
case BuildRight => (left, right)
@@ -75,9 +81,17 @@ case class BroadcastNestedLoopJoin(
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
protected override def doExecute(): RDD[InternalRow] = {
+ val (numStreamedRows, numBuildRows) = buildSide match {
+ case BuildRight => (longMetric("numLeftRows"), longMetric("numRightRows"))
+ case BuildLeft => (longMetric("numRightRows"), longMetric("numLeftRows"))
+ }
+ val numOutputRows = longMetric("numOutputRows")
+
val broadcastedRelation =
- sparkContext.broadcast(broadcast.execute().map(_.copy())
- .collect().toIndexedSeq)
+ sparkContext.broadcast(broadcast.execute().map { row =>
+ numBuildRows += 1
+ row.copy()
+ }.collect().toIndexedSeq)
/** All rows that either match both-way, or rows from streamed joined with nulls. */
val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter =>
@@ -94,6 +108,7 @@ case class BroadcastNestedLoopJoin(
streamedIter.foreach { streamedRow =>
var i = 0
var streamRowMatched = false
+ numStreamedRows += 1
while (i < broadcastedRelation.value.size) {
val broadcastedRow = broadcastedRelation.value(i)
@@ -162,6 +177,12 @@ case class BroadcastNestedLoopJoin(
// TODO: Breaks lineage.
sparkContext.union(
- matchesOrStreamedRowsWithNulls.flatMap(_._1), sparkContext.makeRDD(broadcastRowsWithNulls))
+ matchesOrStreamedRowsWithNulls.flatMap(_._1),
+ sparkContext.makeRDD(broadcastRowsWithNulls)
+ ).map { row =>
+ // `broadcastRowsWithNulls` doesn't run in a job so that we have to track numOutputRows here.
+ numOutputRows += 1
+ row
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 261b472415..2115f40702 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -22,6 +22,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow}
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -30,13 +31,31 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
protected override def doExecute(): RDD[InternalRow] = {
- val leftResults = left.execute().map(_.copy())
- val rightResults = right.execute().map(_.copy())
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
+ val leftResults = left.execute().map { row =>
+ numLeftRows += 1
+ row.copy()
+ }
+ val rightResults = right.execute().map { row =>
+ numRightRows += 1
+ row.copy()
+ }
leftResults.cartesian(rightResults).mapPartitions { iter =>
val joinedRow = new JoinedRow
- iter.map(r => joinedRow(r._1, r._2))
+ iter.map { r =>
+ numOutputRows += 1
+ joinedRow(r._1, r._2)
+ }
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 22d46d1c3e..7ce4a51783 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.joins
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.LongSQLMetric
trait HashJoin {
@@ -69,7 +70,9 @@ trait HashJoin {
protected def hashJoin(
streamIter: Iterator[InternalRow],
- hashedRelation: HashedRelation): Iterator[InternalRow] =
+ numStreamRows: LongSQLMetric,
+ hashedRelation: HashedRelation,
+ numOutputRows: LongSQLMetric): Iterator[InternalRow] =
{
new Iterator[InternalRow] {
private[this] var currentStreamedRow: InternalRow = _
@@ -98,6 +101,7 @@ trait HashJoin {
case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow)
}
currentMatchPosition += 1
+ numOutputRows += 1
resultProjection(ret)
}
@@ -113,6 +117,7 @@ trait HashJoin {
while (currentHashMatches == null && streamIter.hasNext) {
currentStreamedRow = streamIter.next()
+ numStreamRows += 1
val key = joinKeys(currentStreamedRow)
if (!key.anyNull) {
currentHashMatches = hashedRelation.get(key)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index 701bd3cd86..66903347c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.LongSQLMetric
import org.apache.spark.util.collection.CompactBuffer
@DeveloperApi
@@ -114,22 +115,28 @@ trait HashOuterJoin {
key: InternalRow,
joinedRow: JoinedRow,
rightIter: Iterable[InternalRow],
- resultProjection: InternalRow => InternalRow): Iterator[InternalRow] = {
+ resultProjection: InternalRow => InternalRow,
+ numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
val ret: Iterable[InternalRow] = {
if (!key.anyNull) {
val temp = if (rightIter != null) {
rightIter.collect {
- case r if boundCondition(joinedRow.withRight(r)) => resultProjection(joinedRow).copy()
+ case r if boundCondition(joinedRow.withRight(r)) => {
+ numOutputRows += 1
+ resultProjection(joinedRow).copy()
+ }
}
} else {
List.empty
}
if (temp.isEmpty) {
+ numOutputRows += 1
resultProjection(joinedRow.withRight(rightNullRow)) :: Nil
} else {
temp
}
} else {
+ numOutputRows += 1
resultProjection(joinedRow.withRight(rightNullRow)) :: Nil
}
}
@@ -140,22 +147,28 @@ trait HashOuterJoin {
key: InternalRow,
leftIter: Iterable[InternalRow],
joinedRow: JoinedRow,
- resultProjection: InternalRow => InternalRow): Iterator[InternalRow] = {
+ resultProjection: InternalRow => InternalRow,
+ numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
val ret: Iterable[InternalRow] = {
if (!key.anyNull) {
val temp = if (leftIter != null) {
leftIter.collect {
- case l if boundCondition(joinedRow.withLeft(l)) => resultProjection(joinedRow).copy()
+ case l if boundCondition(joinedRow.withLeft(l)) => {
+ numOutputRows += 1
+ resultProjection(joinedRow).copy()
+ }
}
} else {
List.empty
}
if (temp.isEmpty) {
+ numOutputRows += 1
resultProjection(joinedRow.withLeft(leftNullRow)) :: Nil
} else {
temp
}
} else {
+ numOutputRows += 1
resultProjection(joinedRow.withLeft(leftNullRow)) :: Nil
}
}
@@ -164,7 +177,7 @@ trait HashOuterJoin {
protected[this] def fullOuterIterator(
key: InternalRow, leftIter: Iterable[InternalRow], rightIter: Iterable[InternalRow],
- joinedRow: JoinedRow): Iterator[InternalRow] = {
+ joinedRow: JoinedRow, numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
if (!key.anyNull) {
// Store the positions of records in right, if one of its associated row satisfy
// the join condition.
@@ -177,6 +190,7 @@ trait HashOuterJoin {
// append them directly
case (r, idx) if boundCondition(joinedRow.withRight(r)) =>
+ numOutputRows += 1
matched = true
// if the row satisfy the join condition, add its index into the matched set
rightMatchedSet.add(idx)
@@ -189,6 +203,7 @@ trait HashOuterJoin {
// as we don't know whether we need to append it until finish iterating all
// of the records in right side.
// If we didn't get any proper row, then append a single row with empty right.
+ numOutputRows += 1
joinedRow.withRight(rightNullRow).copy()
})
} ++ rightIter.zipWithIndex.collect {
@@ -197,12 +212,15 @@ trait HashOuterJoin {
// Re-visiting the records in right, and append additional row with empty left, if its not
// in the matched set.
case (r, idx) if !rightMatchedSet.contains(idx) =>
+ numOutputRows += 1
joinedRow(leftNullRow, r).copy()
}
} else {
leftIter.iterator.map[InternalRow] { l =>
+ numOutputRows += 1
joinedRow(l, rightNullRow).copy()
} ++ rightIter.iterator.map[InternalRow] { r =>
+ numOutputRows += 1
joinedRow(leftNullRow, r).copy()
}
}
@@ -211,10 +229,12 @@ trait HashOuterJoin {
// This is only used by FullOuter
protected[this] def buildHashTable(
iter: Iterator[InternalRow],
+ numIterRows: LongSQLMetric,
keyGenerator: Projection): JavaHashMap[InternalRow, CompactBuffer[InternalRow]] = {
val hashTable = new JavaHashMap[InternalRow, CompactBuffer[InternalRow]]()
while (iter.hasNext) {
val currentRow = iter.next()
+ numIterRows += 1
val rowKey = keyGenerator(currentRow)
var existingMatchList = hashTable.get(rowKey)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index 82dd6eb7e7..beb141ade6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.joins
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.LongSQLMetric
trait HashSemiJoin {
@@ -61,13 +62,15 @@ trait HashSemiJoin {
@transient private lazy val boundCondition =
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
- protected def buildKeyHashSet(buildIter: Iterator[InternalRow]): java.util.Set[InternalRow] = {
+ protected def buildKeyHashSet(
+ buildIter: Iterator[InternalRow], numBuildRows: LongSQLMetric): java.util.Set[InternalRow] = {
val hashSet = new java.util.HashSet[InternalRow]()
// Create a Hash set of buildKeys
val rightKey = rightKeyGenerator
while (buildIter.hasNext) {
val currentRow = buildIter.next()
+ numBuildRows += 1
val rowKey = rightKey(currentRow)
if (!rowKey.anyNull) {
val keyExists = hashSet.contains(rowKey)
@@ -82,25 +85,35 @@ trait HashSemiJoin {
protected def hashSemiJoin(
streamIter: Iterator[InternalRow],
- hashSet: java.util.Set[InternalRow]): Iterator[InternalRow] = {
+ numStreamRows: LongSQLMetric,
+ hashSet: java.util.Set[InternalRow],
+ numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
val joinKeys = leftKeyGenerator
streamIter.filter(current => {
+ numStreamRows += 1
val key = joinKeys(current)
- !key.anyNull && hashSet.contains(key)
+ val r = !key.anyNull && hashSet.contains(key)
+ if (r) numOutputRows += 1
+ r
})
}
protected def hashSemiJoin(
streamIter: Iterator[InternalRow],
- hashedRelation: HashedRelation): Iterator[InternalRow] = {
+ numStreamRows: LongSQLMetric,
+ hashedRelation: HashedRelation,
+ numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
val joinKeys = leftKeyGenerator
val joinedRow = new JoinedRow
streamIter.filter { current =>
+ numStreamRows += 1
val key = joinKeys(current)
lazy val rowBuffer = hashedRelation.get(key)
- !key.anyNull && rowBuffer != null && rowBuffer.exists {
+ val r = !key.anyNull && rowBuffer != null && rowBuffer.exists {
(row: InternalRow) => boundCondition(joinedRow(current, row))
}
+ if (r) numOutputRows += 1
+ r
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 63d35d0f02..c1bc7947aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -25,6 +25,7 @@ import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkSqlSerializer
+import org.apache.spark.sql.execution.metric.LongSQLMetric
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager}
@@ -112,11 +113,13 @@ private[joins] object HashedRelation {
def apply(
input: Iterator[InternalRow],
+ numInputRows: LongSQLMetric,
keyGenerator: Projection,
sizeEstimate: Int = 64): HashedRelation = {
if (keyGenerator.isInstanceOf[UnsafeProjection]) {
- return UnsafeHashedRelation(input, keyGenerator.asInstanceOf[UnsafeProjection], sizeEstimate)
+ return UnsafeHashedRelation(
+ input, numInputRows, keyGenerator.asInstanceOf[UnsafeProjection], sizeEstimate)
}
// TODO: Use Spark's HashMap implementation.
@@ -130,6 +133,7 @@ private[joins] object HashedRelation {
// Create a mapping of buildKeys -> rows
while (input.hasNext) {
currentRow = input.next()
+ numInputRows += 1
val rowKey = keyGenerator(currentRow)
if (!rowKey.anyNull) {
val existingMatchList = hashTable.get(rowKey)
@@ -331,6 +335,7 @@ private[joins] object UnsafeHashedRelation {
def apply(
input: Iterator[InternalRow],
+ numInputRows: LongSQLMetric,
keyGenerator: UnsafeProjection,
sizeEstimate: Int): HashedRelation = {
@@ -340,6 +345,7 @@ private[joins] object UnsafeHashedRelation {
// Create a mapping of buildKeys -> rows
while (input.hasNext) {
val unsafeRow = input.next().asInstanceOf[UnsafeRow]
+ numInputRows += 1
val rowKey = keyGenerator(unsafeRow)
if (!rowKey.anyNull) {
val existingMatchList = hashTable.get(rowKey)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
index 4443455ef1..ad6362542f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -35,6 +36,11 @@ case class LeftSemiJoinBNL(
extends BinaryNode {
// TODO: Override requiredChildDistribution.
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def outputPartitioning: Partitioning = streamed.outputPartitioning
override def output: Seq[Attribute] = left.output
@@ -52,13 +58,21 @@ case class LeftSemiJoinBNL(
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
protected override def doExecute(): RDD[InternalRow] = {
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
val broadcastedRelation =
- sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
+ sparkContext.broadcast(broadcast.execute().map { row =>
+ numRightRows += 1
+ row.copy()
+ }.collect().toIndexedSeq)
streamed.execute().mapPartitions { streamedIter =>
val joinedRow = new JoinedRow
streamedIter.filter(streamedRow => {
+ numLeftRows += 1
var i = 0
var matched = false
@@ -69,6 +83,9 @@ case class LeftSemiJoinBNL(
}
i += 1
}
+ if (matched) {
+ numOutputRows += 1
+ }
matched
})
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
index 68ccd34d8e..18808adaac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, Distribution, ClusteredDistribution}
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -37,19 +38,28 @@ case class LeftSemiJoinHash(
right: SparkPlan,
condition: Option[Expression]) extends BinaryNode with HashSemiJoin {
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def outputPartitioning: Partitioning = left.outputPartitioning
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
protected override def doExecute(): RDD[InternalRow] = {
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) =>
if (condition.isEmpty) {
- val hashSet = buildKeyHashSet(buildIter)
- hashSemiJoin(streamIter, hashSet)
+ val hashSet = buildKeyHashSet(buildIter, numRightRows)
+ hashSemiJoin(streamIter, numLeftRows, hashSet, numOutputRows)
} else {
- val hashRelation = HashedRelation(buildIter, rightKeyGenerator)
- hashSemiJoin(streamIter, hashRelation)
+ val hashRelation = HashedRelation(buildIter, numRightRows, rightKeyGenerator)
+ hashSemiJoin(streamIter, numLeftRows, hashRelation, numOutputRows)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
index c923dc837c..fc8c9439a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -38,7 +39,10 @@ case class ShuffledHashJoin(
right: SparkPlan)
extends BinaryNode with HashJoin {
- override protected[sql] val trackNumOfRowsEnabled = true
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def outputPartitioning: Partitioning =
PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
@@ -47,9 +51,15 @@ case class ShuffledHashJoin(
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
protected override def doExecute(): RDD[InternalRow] = {
+ val (numBuildRows, numStreamedRows) = buildSide match {
+ case BuildLeft => (longMetric("numLeftRows"), longMetric("numRightRows"))
+ case BuildRight => (longMetric("numRightRows"), longMetric("numLeftRows"))
+ }
+ val numOutputRows = longMetric("numOutputRows")
+
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
- val hashed = HashedRelation(buildIter, buildSideKeyGenerator)
- hashJoin(streamIter, hashed)
+ val hashed = HashedRelation(buildIter, numBuildRows, buildSideKeyGenerator)
+ hashJoin(streamIter, numStreamedRows, hashed, numOutputRows)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
index 6a8c35efca..ed282f98b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -41,6 +42,11 @@ case class ShuffledHashOuterJoin(
left: SparkPlan,
right: SparkPlan) extends BinaryNode with HashOuterJoin {
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
@@ -53,39 +59,48 @@ case class ShuffledHashOuterJoin(
}
protected override def doExecute(): RDD[InternalRow] = {
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
val joinedRow = new JoinedRow()
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
// TODO this probably can be replaced by external sort (sort merged join?)
joinType match {
case LeftOuter =>
- val hashed = HashedRelation(rightIter, buildKeyGenerator)
+ val hashed = HashedRelation(rightIter, numRightRows, buildKeyGenerator)
val keyGenerator = streamedKeyGenerator
val resultProj = resultProjection
leftIter.flatMap( currentRow => {
+ numLeftRows += 1
val rowKey = keyGenerator(currentRow)
joinedRow.withLeft(currentRow)
- leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey), resultProj)
+ leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey), resultProj, numOutputRows)
})
case RightOuter =>
- val hashed = HashedRelation(leftIter, buildKeyGenerator)
+ val hashed = HashedRelation(leftIter, numLeftRows, buildKeyGenerator)
val keyGenerator = streamedKeyGenerator
val resultProj = resultProjection
rightIter.flatMap ( currentRow => {
+ numRightRows += 1
val rowKey = keyGenerator(currentRow)
joinedRow.withRight(currentRow)
- rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow, resultProj)
+ rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow, resultProj, numOutputRows)
})
case FullOuter =>
// TODO(davies): use UnsafeRow
- val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, left.output))
- val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output))
+ val leftHashTable =
+ buildHashTable(leftIter, numLeftRows, newProjection(leftKeys, left.output))
+ val rightHashTable =
+ buildHashTable(rightIter, numRightRows, newProjection(rightKeys, right.output))
(leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key =>
fullOuterIterator(key,
leftHashTable.getOrElse(key, EMPTY_LIST),
rightHashTable.getOrElse(key, EMPTY_LIST),
- joinedRow)
+ joinedRow,
+ numOutputRows)
}
case x =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 6d656ea284..6b7322671d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryNode, RowIterator, SparkPlan}
+import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
/**
* :: DeveloperApi ::
@@ -37,6 +38,11 @@ case class SortMergeJoin(
left: SparkPlan,
right: SparkPlan) extends BinaryNode {
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def output: Seq[Attribute] = left.output ++ right.output
override def outputPartitioning: Partitioning =
@@ -70,6 +76,10 @@ case class SortMergeJoin(
}
protected override def doExecute(): RDD[InternalRow] = {
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
new RowIterator {
// An ordering that can be used to compare keys from both sides.
@@ -82,7 +92,9 @@ case class SortMergeJoin(
rightKeyGenerator,
keyOrdering,
RowIterator.fromScala(leftIter),
- RowIterator.fromScala(rightIter)
+ numLeftRows,
+ RowIterator.fromScala(rightIter),
+ numRightRows
)
private[this] val joinRow = new JoinedRow
private[this] val resultProjection: (InternalRow) => InternalRow = {
@@ -108,6 +120,7 @@ case class SortMergeJoin(
if (currentLeftRow != null) {
joinRow(currentLeftRow, currentRightMatches(currentMatchIdx))
currentMatchIdx += 1
+ numOutputRows += 1
true
} else {
false
@@ -144,7 +157,9 @@ private[joins] class SortMergeJoinScanner(
bufferedKeyGenerator: Projection,
keyOrdering: Ordering[InternalRow],
streamedIter: RowIterator,
- bufferedIter: RowIterator) {
+ numStreamedRows: LongSQLMetric,
+ bufferedIter: RowIterator,
+ numBufferedRows: LongSQLMetric) {
private[this] var streamedRow: InternalRow = _
private[this] var streamedRowKey: InternalRow = _
private[this] var bufferedRow: InternalRow = _
@@ -269,6 +284,7 @@ private[joins] class SortMergeJoinScanner(
if (streamedIter.advanceNext()) {
streamedRow = streamedIter.getRow
streamedRowKey = streamedKeyGenerator(streamedRow)
+ numStreamedRows += 1
true
} else {
streamedRow = null
@@ -286,6 +302,7 @@ private[joins] class SortMergeJoinScanner(
while (!foundRow && bufferedIter.advanceNext()) {
bufferedRow = bufferedIter.getRow
bufferedRowKey = bufferedKeyGenerator(bufferedRow)
+ numBufferedRows += 1
foundRow = !bufferedRowKey.anyNull
}
if (!foundRow) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
index 5326966b07..dea9e5e580 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryNode, RowIterator, SparkPlan}
+import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
/**
* :: DeveloperApi ::
@@ -40,6 +41,11 @@ case class SortMergeOuterJoin(
left: SparkPlan,
right: SparkPlan) extends BinaryNode {
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def output: Seq[Attribute] = {
joinType match {
case LeftOuter =>
@@ -108,6 +114,10 @@ case class SortMergeOuterJoin(
}
override def doExecute(): RDD[InternalRow] = {
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
// An ordering that can be used to compare keys from both sides.
val keyOrdering = newNaturalAscendingOrdering(leftKeys.map(_.dataType))
@@ -133,10 +143,13 @@ case class SortMergeOuterJoin(
bufferedKeyGenerator = createRightKeyGenerator(),
keyOrdering,
streamedIter = RowIterator.fromScala(leftIter),
- bufferedIter = RowIterator.fromScala(rightIter)
+ numLeftRows,
+ bufferedIter = RowIterator.fromScala(rightIter),
+ numRightRows
)
val rightNullRow = new GenericInternalRow(right.output.length)
- new LeftOuterIterator(smjScanner, rightNullRow, boundCondition, resultProj).toScala
+ new LeftOuterIterator(
+ smjScanner, rightNullRow, boundCondition, resultProj, numOutputRows).toScala
case RightOuter =>
val smjScanner = new SortMergeJoinScanner(
@@ -144,10 +157,13 @@ case class SortMergeOuterJoin(
bufferedKeyGenerator = createLeftKeyGenerator(),
keyOrdering,
streamedIter = RowIterator.fromScala(rightIter),
- bufferedIter = RowIterator.fromScala(leftIter)
+ numRightRows,
+ bufferedIter = RowIterator.fromScala(leftIter),
+ numLeftRows
)
val leftNullRow = new GenericInternalRow(left.output.length)
- new RightOuterIterator(smjScanner, leftNullRow, boundCondition, resultProj).toScala
+ new RightOuterIterator(
+ smjScanner, leftNullRow, boundCondition, resultProj, numOutputRows).toScala
case x =>
throw new IllegalArgumentException(
@@ -162,7 +178,8 @@ private class LeftOuterIterator(
smjScanner: SortMergeJoinScanner,
rightNullRow: InternalRow,
boundCondition: InternalRow => Boolean,
- resultProj: InternalRow => InternalRow
+ resultProj: InternalRow => InternalRow,
+ numRows: LongSQLMetric
) extends RowIterator {
private[this] val joinedRow: JoinedRow = new JoinedRow()
private[this] var rightIdx: Int = 0
@@ -198,7 +215,9 @@ private class LeftOuterIterator(
}
override def advanceNext(): Boolean = {
- advanceRightUntilBoundConditionSatisfied() || advanceLeft()
+ val r = advanceRightUntilBoundConditionSatisfied() || advanceLeft()
+ if (r) numRows += 1
+ r
}
override def getRow: InternalRow = resultProj(joinedRow)
@@ -208,7 +227,8 @@ private class RightOuterIterator(
smjScanner: SortMergeJoinScanner,
leftNullRow: InternalRow,
boundCondition: InternalRow => Boolean,
- resultProj: InternalRow => InternalRow
+ resultProj: InternalRow => InternalRow,
+ numRows: LongSQLMetric
) extends RowIterator {
private[this] val joinedRow: JoinedRow = new JoinedRow()
private[this] var leftIdx: Int = 0
@@ -244,7 +264,9 @@ private class RightOuterIterator(
}
override def advanceNext(): Boolean = {
- advanceLeftUntilBoundConditionSatisfied() || advanceRight()
+ val r = advanceLeftUntilBoundConditionSatisfied() || advanceRight()
+ if (r) numRows += 1
+ r
}
override def getRow: InternalRow = resultProj(joinedRow)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 1b51a5e5c8..7a2a98ec18 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -112,4 +112,10 @@ private[sql] object SQLMetrics {
sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}
+
+ /**
+ * A metric that its value will be ignored. Use this one when we need a metric parameter but don't
+ * care about the value.
+ */
+ val nullLongMetric = new LongSQLMetric("null")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 8b1a9b21a9..a1fa2c3864 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -22,6 +22,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream,
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.collection.CompactBuffer
@@ -35,7 +37,8 @@ class HashedRelationSuite extends SparkFunSuite {
test("GeneralHashedRelation") {
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
- val hashed = HashedRelation(data.iterator, keyProjection)
+ val numDataRows = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, "data")
+ val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
assert(hashed.isInstanceOf[GeneralHashedRelation])
assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0)))
@@ -45,11 +48,13 @@ class HashedRelationSuite extends SparkFunSuite {
val data2 = CompactBuffer[InternalRow](data(2))
data2 += data(2)
assert(hashed.get(data(2)) === data2)
+ assert(numDataRows.value.value === data.length)
}
test("UniqueKeyHashedRelation") {
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2))
- val hashed = HashedRelation(data.iterator, keyProjection)
+ val numDataRows = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, "data")
+ val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
assert(hashed.isInstanceOf[UniqueKeyHashedRelation])
assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0)))
@@ -62,17 +67,19 @@ class HashedRelationSuite extends SparkFunSuite {
assert(uniqHashed.getValue(data(1)) === data(1))
assert(uniqHashed.getValue(data(2)) === data(2))
assert(uniqHashed.getValue(InternalRow(10)) === null)
+ assert(numDataRows.value.value === data.length)
}
test("UnsafeHashedRelation") {
val schema = StructType(StructField("a", IntegerType, true) :: Nil)
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
+ val numDataRows = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, "data")
val toUnsafe = UnsafeProjection.create(schema)
val unsafeData = data.map(toUnsafe(_).copy()).toArray
val buildKey = Seq(BoundReference(0, IntegerType, false))
val keyGenerator = UnsafeProjection.create(buildKey)
- val hashed = UnsafeHashedRelation(unsafeData.iterator, keyGenerator, 1)
+ val hashed = UnsafeHashedRelation(unsafeData.iterator, numDataRows, keyGenerator, 1)
assert(hashed.isInstanceOf[UnsafeHashedRelation])
assert(hashed.get(unsafeData(0)) === CompactBuffer[InternalRow](unsafeData(0)))
@@ -94,5 +101,6 @@ class HashedRelationSuite extends SparkFunSuite {
assert(hashed2.get(unsafeData(1)) === CompactBuffer[InternalRow](unsafeData(1)))
assert(hashed2.get(toUnsafe(InternalRow(10))) === null)
assert(hashed2.get(unsafeData(2)) === data2)
+ assert(numDataRows.value.value === data.length)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 953284c98b..7383d3f8fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -25,15 +25,24 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.ui.SparkPlanGraph
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.{SQLTestUtils, TestSQLContext}
import org.apache.spark.util.Utils
+class SQLMetricsSuite extends SparkFunSuite with SQLTestUtils {
-class SQLMetricsSuite extends SparkFunSuite {
+ override val sqlContext = TestSQLContext
+
+ import sqlContext.implicits._
test("LongSQLMetric should not box Long") {
val l = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, "long")
- val f = () => { l += 1L }
+ val f = () => {
+ l += 1L
+ l.add(1L)
+ }
BoxingFinder.getClassReader(f.getClass).foreach { cl =>
val boxingFinder = new BoxingFinder()
cl.accept(boxingFinder, 0)
@@ -51,6 +60,441 @@ class SQLMetricsSuite extends SparkFunSuite {
assert(boxingFinder.boxingInvokes.nonEmpty, "Found find boxing in this test")
}
}
+
+ /**
+ * Call `df.collect()` and verify if the collected metrics are same as "expectedMetrics".
+ *
+ * @param df `DataFrame` to run
+ * @param expectedNumOfJobs number of jobs that will run
+ * @param expectedMetrics the expected metrics. The format is
+ * `nodeId -> (operatorName, metric name -> metric value)`.
+ */
+ private def testSparkPlanMetrics(
+ df: DataFrame,
+ expectedNumOfJobs: Int,
+ expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
+ val previousExecutionIds = TestSQLContext.listener.executionIdToData.keySet
+ df.collect()
+ TestSQLContext.sparkContext.listenerBus.waitUntilEmpty(10000)
+ val executionIds = TestSQLContext.listener.executionIdToData.keySet.diff(previousExecutionIds)
+ assert(executionIds.size === 1)
+ val executionId = executionIds.head
+ val jobs = TestSQLContext.listener.getExecution(executionId).get.jobs
+ // Use "<=" because there is a race condition that we may miss some jobs
+ // TODO Change it to "=" once we fix the race condition that missing the JobStarted event.
+ assert(jobs.size <= expectedNumOfJobs)
+ if (jobs.size == expectedNumOfJobs) {
+ // If we can track all jobs, check the metric values
+ val metricValues = TestSQLContext.listener.getExecutionMetrics(executionId)
+ val actualMetrics = SparkPlanGraph(df.queryExecution.executedPlan).nodes.filter { node =>
+ expectedMetrics.contains(node.id)
+ }.map { node =>
+ val nodeMetrics = node.metrics.map { metric =>
+ val metricValue = metricValues(metric.accumulatorId)
+ (metric.name, metricValue)
+ }.toMap
+ (node.id, node.name -> nodeMetrics)
+ }.toMap
+ assert(expectedMetrics === actualMetrics)
+ } else {
+ // TODO Remove this "else" once we fix the race condition that missing the JobStarted event.
+ // Since we cannot track all jobs, the metric values could be wrong and we should not check
+ // them.
+ logWarning("Due to a race condition, we miss some jobs and cannot verify the metric values")
+ }
+ }
+
+ test("Project metrics") {
+ withSQLConf(
+ SQLConf.UNSAFE_ENABLED.key -> "false",
+ SQLConf.CODEGEN_ENABLED.key -> "false",
+ SQLConf.TUNGSTEN_ENABLED.key -> "false") {
+ // Assume the execution plan is
+ // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
+ val df = TestData.person.select('name)
+ testSparkPlanMetrics(df, 1, Map(
+ 0L ->("Project", Map(
+ "number of rows" -> 2L)))
+ )
+ }
+ }
+
+ test("TungstenProject metrics") {
+ withSQLConf(
+ SQLConf.UNSAFE_ENABLED.key -> "true",
+ SQLConf.CODEGEN_ENABLED.key -> "true",
+ SQLConf.TUNGSTEN_ENABLED.key -> "true") {
+ // Assume the execution plan is
+ // PhysicalRDD(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = TestData.person.select('name)
+ testSparkPlanMetrics(df, 1, Map(
+ 0L ->("TungstenProject", Map(
+ "number of rows" -> 2L)))
+ )
+ }
+ }
+
+ test("Filter metrics") {
+ // Assume the execution plan is
+ // PhysicalRDD(nodeId = 1) -> Filter(nodeId = 0)
+ val df = TestData.person.filter('age < 25)
+ testSparkPlanMetrics(df, 1, Map(
+ 0L -> ("Filter", Map(
+ "number of input rows" -> 2L,
+ "number of output rows" -> 1L)))
+ )
+ }
+
+ test("Aggregate metrics") {
+ withSQLConf(
+ SQLConf.UNSAFE_ENABLED.key -> "false",
+ SQLConf.CODEGEN_ENABLED.key -> "false",
+ SQLConf.TUNGSTEN_ENABLED.key -> "false") {
+ // Assume the execution plan is
+ // ... -> Aggregate(nodeId = 2) -> TungstenExchange(nodeId = 1) -> Aggregate(nodeId = 0)
+ val df = TestData.testData2.groupBy().count() // 2 partitions
+ testSparkPlanMetrics(df, 1, Map(
+ 2L -> ("Aggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 2L)),
+ 0L -> ("Aggregate", Map(
+ "number of input rows" -> 2L,
+ "number of output rows" -> 1L)))
+ )
+
+ // 2 partitions and each partition contains 2 keys
+ val df2 = TestData.testData2.groupBy('a).count()
+ testSparkPlanMetrics(df2, 1, Map(
+ 2L -> ("Aggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 4L)),
+ 0L -> ("Aggregate", Map(
+ "number of input rows" -> 4L,
+ "number of output rows" -> 3L)))
+ )
+ }
+ }
+
+ test("SortBasedAggregate metrics") {
+ // Because SortBasedAggregate may skip different rows if the number of partitions is different,
+ // this test should use the deterministic number of partitions.
+ withSQLConf(
+ SQLConf.UNSAFE_ENABLED.key -> "false",
+ SQLConf.CODEGEN_ENABLED.key -> "true",
+ SQLConf.TUNGSTEN_ENABLED.key -> "true") {
+ // Assume the execution plan is
+ // ... -> SortBasedAggregate(nodeId = 2) -> TungstenExchange(nodeId = 1) ->
+ // SortBasedAggregate(nodeId = 0)
+ val df = TestData.testData2.groupBy().count() // 2 partitions
+ testSparkPlanMetrics(df, 1, Map(
+ 2L -> ("SortBasedAggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 2L)),
+ 0L -> ("SortBasedAggregate", Map(
+ "number of input rows" -> 2L,
+ "number of output rows" -> 1L)))
+ )
+
+ // Assume the execution plan is
+ // ... -> SortBasedAggregate(nodeId = 3) -> TungstenExchange(nodeId = 2)
+ // -> ExternalSort(nodeId = 1)-> SortBasedAggregate(nodeId = 0)
+ // 2 partitions and each partition contains 2 keys
+ val df2 = TestData.testData2.groupBy('a).count()
+ testSparkPlanMetrics(df2, 1, Map(
+ 3L -> ("SortBasedAggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 4L)),
+ 0L -> ("SortBasedAggregate", Map(
+ "number of input rows" -> 4L,
+ "number of output rows" -> 3L)))
+ )
+ }
+ }
+
+ test("TungstenAggregate metrics") {
+ withSQLConf(
+ SQLConf.UNSAFE_ENABLED.key -> "true",
+ SQLConf.CODEGEN_ENABLED.key -> "true",
+ SQLConf.TUNGSTEN_ENABLED.key -> "true") {
+ // Assume the execution plan is
+ // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)
+ // -> TungstenAggregate(nodeId = 0)
+ val df = TestData.testData2.groupBy().count() // 2 partitions
+ testSparkPlanMetrics(df, 1, Map(
+ 2L -> ("TungstenAggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 2L)),
+ 0L -> ("TungstenAggregate", Map(
+ "number of input rows" -> 2L,
+ "number of output rows" -> 1L)))
+ )
+
+ // 2 partitions and each partition contains 2 keys
+ val df2 = TestData.testData2.groupBy('a).count()
+ testSparkPlanMetrics(df2, 1, Map(
+ 2L -> ("TungstenAggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 4L)),
+ 0L -> ("TungstenAggregate", Map(
+ "number of input rows" -> 4L,
+ "number of output rows" -> 3L)))
+ )
+ }
+ }
+
+ test("SortMergeJoin metrics") {
+ // Because SortMergeJoin may skip different rows if the number of partitions is different, this
+ // test should use the deterministic number of partitions.
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
+ val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+ testDataForJoin.registerTempTable("testDataForJoin")
+ withTempTable("testDataForJoin") {
+ // Assume the execution plan is
+ // ... -> SortMergeJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = sqlContext.sql(
+ "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
+ testSparkPlanMetrics(df, 1, Map(
+ 1L -> ("SortMergeJoin", Map(
+ // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
+ "number of left rows" -> 4L,
+ "number of right rows" -> 2L,
+ "number of output rows" -> 4L)))
+ )
+ }
+ }
+ }
+
+ test("SortMergeOuterJoin metrics") {
+ // Because SortMergeOuterJoin may skip different rows if the number of partitions is different,
+ // this test should use the deterministic number of partitions.
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
+ val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+ testDataForJoin.registerTempTable("testDataForJoin")
+ withTempTable("testDataForJoin") {
+ // Assume the execution plan is
+ // ... -> SortMergeOuterJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = sqlContext.sql(
+ "SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
+ testSparkPlanMetrics(df, 1, Map(
+ 1L -> ("SortMergeOuterJoin", Map(
+ // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
+ "number of left rows" -> 6L,
+ "number of right rows" -> 2L,
+ "number of output rows" -> 8L)))
+ )
+
+ val df2 = sqlContext.sql(
+ "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a")
+ testSparkPlanMetrics(df2, 1, Map(
+ 1L -> ("SortMergeOuterJoin", Map(
+ // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
+ "number of left rows" -> 2L,
+ "number of right rows" -> 6L,
+ "number of output rows" -> 8L)))
+ )
+ }
+ }
+ }
+
+ test("BroadcastHashJoin metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+ val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+ val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key", "value")
+ // Assume the execution plan is
+ // ... -> BroadcastHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = df1.join(broadcast(df2), "key")
+ testSparkPlanMetrics(df, 2, Map(
+ 1L -> ("BroadcastHashJoin", Map(
+ "number of left rows" -> 2L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 2L)))
+ )
+ }
+ }
+
+ test("ShuffledHashJoin metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+ val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+ testDataForJoin.registerTempTable("testDataForJoin")
+ withTempTable("testDataForJoin") {
+ // Assume the execution plan is
+ // ... -> ShuffledHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = sqlContext.sql(
+ "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
+ testSparkPlanMetrics(df, 1, Map(
+ 1L -> ("ShuffledHashJoin", Map(
+ "number of left rows" -> 6L,
+ "number of right rows" -> 2L,
+ "number of output rows" -> 4L)))
+ )
+ }
+ }
+ }
+
+ test("ShuffledHashOuterJoin metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
+ val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
+ val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
+ // Assume the execution plan is
+ // ... -> ShuffledHashOuterJoin(nodeId = 0)
+ val df = df1.join(df2, $"key" === $"key2", "left_outer")
+ testSparkPlanMetrics(df, 1, Map(
+ 0L -> ("ShuffledHashOuterJoin", Map(
+ "number of left rows" -> 3L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 5L)))
+ )
+
+ val df3 = df1.join(df2, $"key" === $"key2", "right_outer")
+ testSparkPlanMetrics(df3, 1, Map(
+ 0L -> ("ShuffledHashOuterJoin", Map(
+ "number of left rows" -> 3L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 6L)))
+ )
+
+ val df4 = df1.join(df2, $"key" === $"key2", "outer")
+ testSparkPlanMetrics(df4, 1, Map(
+ 0L -> ("ShuffledHashOuterJoin", Map(
+ "number of left rows" -> 3L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 7L)))
+ )
+ }
+ }
+
+ test("BroadcastHashOuterJoin metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+ val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
+ val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
+ // Assume the execution plan is
+ // ... -> BroadcastHashOuterJoin(nodeId = 0)
+ val df = df1.join(broadcast(df2), $"key" === $"key2", "left_outer")
+ testSparkPlanMetrics(df, 2, Map(
+ 0L -> ("BroadcastHashOuterJoin", Map(
+ "number of left rows" -> 3L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 5L)))
+ )
+
+ val df3 = df1.join(broadcast(df2), $"key" === $"key2", "right_outer")
+ testSparkPlanMetrics(df3, 2, Map(
+ 0L -> ("BroadcastHashOuterJoin", Map(
+ "number of left rows" -> 3L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 6L)))
+ )
+ }
+ }
+
+ test("BroadcastNestedLoopJoin metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
+ val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+ testDataForJoin.registerTempTable("testDataForJoin")
+ withTempTable("testDataForJoin") {
+ // Assume the execution plan is
+ // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = sqlContext.sql(
+ "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
+ "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a")
+ testSparkPlanMetrics(df, 3, Map(
+ 1L -> ("BroadcastNestedLoopJoin", Map(
+ "number of left rows" -> 12L, // left needs to be scanned twice
+ "number of right rows" -> 2L,
+ "number of output rows" -> 12L)))
+ )
+ }
+ }
+ }
+
+ test("BroadcastLeftSemiJoinHash metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+ val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+ val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
+ // Assume the execution plan is
+ // ... -> BroadcastLeftSemiJoinHash(nodeId = 0)
+ val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
+ testSparkPlanMetrics(df, 2, Map(
+ 0L -> ("BroadcastLeftSemiJoinHash", Map(
+ "number of left rows" -> 2L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 2L)))
+ )
+ }
+ }
+
+ test("LeftSemiJoinHash metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
+ val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+ val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
+ // Assume the execution plan is
+ // ... -> LeftSemiJoinHash(nodeId = 0)
+ val df = df1.join(df2, $"key" === $"key2", "leftsemi")
+ testSparkPlanMetrics(df, 1, Map(
+ 0L -> ("LeftSemiJoinHash", Map(
+ "number of left rows" -> 2L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 2L)))
+ )
+ }
+ }
+
+ test("LeftSemiJoinBNL metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+ val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+ val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
+ // Assume the execution plan is
+ // ... -> LeftSemiJoinBNL(nodeId = 0)
+ val df = df1.join(df2, $"key" < $"key2", "leftsemi")
+ testSparkPlanMetrics(df, 2, Map(
+ 0L -> ("LeftSemiJoinBNL", Map(
+ "number of left rows" -> 2L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 2L)))
+ )
+ }
+ }
+
+ test("CartesianProduct metrics") {
+ val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+ testDataForJoin.registerTempTable("testDataForJoin")
+ withTempTable("testDataForJoin") {
+ // Assume the execution plan is
+ // ... -> CartesianProduct(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = sqlContext.sql(
+ "SELECT * FROM testData2 JOIN testDataForJoin")
+ testSparkPlanMetrics(df, 1, Map(
+ 1L -> ("CartesianProduct", Map(
+ "number of left rows" -> 12L, // left needs to be scanned twice
+ "number of right rows" -> 12L, // right is read 6 times
+ "number of output rows" -> 12L)))
+ )
+ }
+ }
+
+ test("save metrics") {
+ withTempPath { file =>
+ val previousExecutionIds = TestSQLContext.listener.executionIdToData.keySet
+ // Assume the execution plan is
+ // PhysicalRDD(nodeId = 0)
+ TestData.person.select('name).write.format("json").save(file.getAbsolutePath)
+ TestSQLContext.sparkContext.listenerBus.waitUntilEmpty(10000)
+ val executionIds = TestSQLContext.listener.executionIdToData.keySet.diff(previousExecutionIds)
+ assert(executionIds.size === 1)
+ val executionId = executionIds.head
+ val jobs = TestSQLContext.listener.getExecution(executionId).get.jobs
+ // Use "<=" because there is a race condition that we may miss some jobs
+ // TODO Change "<=" to "=" once we fix the race condition that missing the JobStarted event.
+ assert(jobs.size <= 1)
+ val metricValues = TestSQLContext.listener.getExecutionMetrics(executionId)
+ // Because "save" will create a new DataFrame internally, we cannot get the real metric id.
+ // However, we still can check the value.
+ assert(metricValues.values.toSeq === Seq(2L))
+ }
+ }
+
}
private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String)