aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-11 12:39:13 -0700
committerYin Huai <yhuai@databricks.com>2015-08-11 12:39:13 -0700
commit5831294a7a8fa2524133c5d718cbc8187d2b0620 (patch)
tree28b256179c248c11a322775f0aabf95cd9239bfd /sql
parent5b8bb1b213b8738f563fcd00747604410fbb3087 (diff)
downloadspark-5831294a7a8fa2524133c5d718cbc8187d2b0620.tar.gz
spark-5831294a7a8fa2524133c5d718cbc8187d2b0620.tar.bz2
spark-5831294a7a8fa2524133c5d718cbc8187d2b0620.zip
[SPARK-9646] [SQL] Add metrics for all join and aggregate operators
This PR added metrics for all join and aggregate operators. However, I found the metrics may be confusing in the following two case: 1. The iterator is not totally consumed and the metric values will be less. 2. Recreating the iterators will make metric values look bigger than the size of the input source, such as `CartesianProduct`. Author: zsxwing <zsxwing@gmail.com> Closes #8060 from zsxwing/sql-metrics and squashes the following commits: 40f3fc1 [zsxwing] Mark LongSQLMetric private[metric] to avoid using incorrectly and leak memory b1b9071 [zsxwing] Merge branch 'master' into sql-metrics 4bef25a [zsxwing] Add metrics for SortMergeOuterJoin 95ccfc6 [zsxwing] Merge branch 'master' into sql-metrics 67cb4dd [zsxwing] Add metrics for Project and TungstenProject; remove metrics from PhysicalRDD and LocalTableScan 0eb47d4 [zsxwing] Merge branch 'master' into sql-metrics dd9d932 [zsxwing] Avoid creating new Iterators 589ea26 [zsxwing] Add metrics for all join and aggregate operators
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala40
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala38
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala450
27 files changed, 847 insertions, 107 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index e8c6a0f8f8..f3b6a3a5f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -45,6 +46,10 @@ case class Aggregate(
child: SparkPlan)
extends UnaryNode {
+ override private[sql] lazy val metrics = Map(
+ "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def requiredChildDistribution: List[Distribution] = {
if (partial) {
UnspecifiedDistribution :: Nil
@@ -121,12 +126,15 @@ case class Aggregate(
}
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
+ val numInputRows = longMetric("numInputRows")
+ val numOutputRows = longMetric("numOutputRows")
if (groupingExpressions.isEmpty) {
child.execute().mapPartitions { iter =>
val buffer = newAggregateBuffer()
var currentRow: InternalRow = null
while (iter.hasNext) {
currentRow = iter.next()
+ numInputRows += 1
var i = 0
while (i < buffer.length) {
buffer(i).update(currentRow)
@@ -142,6 +150,7 @@ case class Aggregate(
i += 1
}
+ numOutputRows += 1
Iterator(resultProjection(aggregateResults))
}
} else {
@@ -152,6 +161,7 @@ case class Aggregate(
var currentRow: InternalRow = null
while (iter.hasNext) {
currentRow = iter.next()
+ numInputRows += 1
val currentGroup = groupingProjection(currentRow)
var currentBuffer = hashTable.get(currentGroup)
if (currentBuffer == null) {
@@ -180,6 +190,7 @@ case class Aggregate(
val currentEntry = hashTableIter.next()
val currentGroup = currentEntry.getKey
val currentBuffer = currentEntry.getValue
+ numOutputRows += 1
var i = 0
while (i < currentBuffer.length) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index cae7ca5cbd..abb60cf12e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -99,8 +99,6 @@ private[sql] case class PhysicalRDD(
rdd: RDD[InternalRow],
extraInformation: String) extends LeafNode {
- override protected[sql] val trackNumOfRowsEnabled = true
-
protected override def doExecute(): RDD[InternalRow] = rdd
override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index 858dd85fd1..34e926e458 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -30,8 +30,6 @@ private[sql] case class LocalTableScan(
output: Seq[Attribute],
rows: Seq[InternalRow]) extends LeafNode {
- override protected[sql] val trackNumOfRowsEnabled = true
-
private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
protected override def doExecute(): RDD[InternalRow] = rdd
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 9ba5cf2d2b..72f5450510 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -81,22 +81,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
/**
- * Whether track the number of rows output by this SparkPlan
- */
- protected[sql] def trackNumOfRowsEnabled: Boolean = false
-
- private lazy val defaultMetrics: Map[String, SQLMetric[_, _]] =
- if (trackNumOfRowsEnabled) {
- Map("numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
- }
- else {
- Map.empty
- }
-
- /**
* Return all metrics containing metrics of this SparkPlan.
*/
- private[sql] def metrics: Map[String, SQLMetric[_, _]] = defaultMetrics
+ private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty
/**
* Return a LongSQLMetric according to the name.
@@ -150,15 +137,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
- if (trackNumOfRowsEnabled) {
- val numRows = longMetric("numRows")
- doExecute().map { row =>
- numRows += 1
- row
- }
- } else {
- doExecute()
- }
+ doExecute()
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
index ad428ad663..ab26f9c58a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, ClusteredDistribution, AllTuples, Distribution}
import org.apache.spark.sql.execution.{UnsafeFixedWidthAggregationMap, SparkPlan, UnaryNode}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.StructType
case class SortBasedAggregate(
@@ -38,6 +39,10 @@ case class SortBasedAggregate(
child: SparkPlan)
extends UnaryNode {
+ override private[sql] lazy val metrics = Map(
+ "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def outputsUnsafeRows: Boolean = false
override def canProcessUnsafeRows: Boolean = false
@@ -63,6 +68,8 @@ case class SortBasedAggregate(
}
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
+ val numInputRows = longMetric("numInputRows")
+ val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitions { iter =>
// Because the constructor of an aggregation iterator will read at least the first row,
// we need to get the value of iter.hasNext first.
@@ -84,10 +91,13 @@ case class SortBasedAggregate(
newProjection _,
child.output,
iter,
- outputsUnsafeRows)
+ outputsUnsafeRows,
+ numInputRows,
+ numOutputRows)
if (!hasInput && groupingExpressions.isEmpty) {
// There is no input and there is no grouping expressions.
// We need to output a single row as the output.
+ numOutputRows += 1
Iterator[InternalRow](outputIter.outputForEmptyGroupingKeyWithoutInput())
} else {
outputIter
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index 67ebafde25..73d50e07cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression2, AggregateFunction2}
+import org.apache.spark.sql.execution.metric.LongSQLMetric
import org.apache.spark.unsafe.KVIterator
/**
@@ -37,7 +38,9 @@ class SortBasedAggregationIterator(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection),
- outputsUnsafeRows: Boolean)
+ outputsUnsafeRows: Boolean,
+ numInputRows: LongSQLMetric,
+ numOutputRows: LongSQLMetric)
extends AggregationIterator(
groupingKeyAttributes,
valueAttributes,
@@ -103,6 +106,7 @@ class SortBasedAggregationIterator(
// Get the grouping key.
val groupingKey = inputKVIterator.getKey
val currentRow = inputKVIterator.getValue
+ numInputRows += 1
// Check if the current row belongs the current input row.
if (currentGroupingKey == groupingKey) {
@@ -137,7 +141,7 @@ class SortBasedAggregationIterator(
val outputRow = generateOutput(currentGroupingKey, sortBasedAggregationBuffer)
// Initialize buffer values for the next group.
initializeBuffer(sortBasedAggregationBuffer)
-
+ numOutputRows += 1
outputRow
} else {
// no more result
@@ -151,7 +155,7 @@ class SortBasedAggregationIterator(
nextGroupingKey = inputKVIterator.getKey().copy()
firstRowInNextGroup = inputKVIterator.getValue().copy()
-
+ numInputRows += 1
sortedInputHasNewGroup = true
} else {
// This inputIter is empty.
@@ -181,7 +185,9 @@ object SortBasedAggregationIterator {
newProjection: (Seq[Expression], Seq[Attribute]) => Projection,
inputAttributes: Seq[Attribute],
inputIter: Iterator[InternalRow],
- outputsUnsafeRows: Boolean): SortBasedAggregationIterator = {
+ outputsUnsafeRows: Boolean,
+ numInputRows: LongSQLMetric,
+ numOutputRows: LongSQLMetric): SortBasedAggregationIterator = {
val kvIterator = if (UnsafeProjection.canSupport(groupingExprs)) {
AggregationIterator.unsafeKVIterator(
groupingExprs,
@@ -202,7 +208,9 @@ object SortBasedAggregationIterator {
initialInputBufferOffset,
resultExpressions,
newMutableProjection,
- outputsUnsafeRows)
+ outputsUnsafeRows,
+ numInputRows,
+ numOutputRows)
}
// scalastyle:on
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 1694794a53..6b5935a7ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, ClusteredDistribution, AllTuples, Distribution}
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
case class TungstenAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]],
@@ -35,6 +36,10 @@ case class TungstenAggregate(
child: SparkPlan)
extends UnaryNode {
+ override private[sql] lazy val metrics = Map(
+ "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
@@ -61,6 +66,8 @@ case class TungstenAggregate(
}
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
+ val numInputRows = longMetric("numInputRows")
+ val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitions { iter =>
val hasInput = iter.hasNext
if (!hasInput && groupingExpressions.nonEmpty) {
@@ -78,9 +85,12 @@ case class TungstenAggregate(
newMutableProjection,
child.output,
iter,
- testFallbackStartsAt)
+ testFallbackStartsAt,
+ numInputRows,
+ numOutputRows)
if (!hasInput && groupingExpressions.isEmpty) {
+ numOutputRows += 1
Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput())
} else {
aggregationIterator
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 32160906c3..1f383dd044 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{UnsafeKVExternalSorter, UnsafeFixedWidthAggregationMap}
+import org.apache.spark.sql.execution.metric.LongSQLMetric
import org.apache.spark.sql.types.StructType
/**
@@ -83,7 +84,9 @@ class TungstenAggregationIterator(
newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection),
originalInputAttributes: Seq[Attribute],
inputIter: Iterator[InternalRow],
- testFallbackStartsAt: Option[Int])
+ testFallbackStartsAt: Option[Int],
+ numInputRows: LongSQLMetric,
+ numOutputRows: LongSQLMetric)
extends Iterator[UnsafeRow] with Logging {
///////////////////////////////////////////////////////////////////////////
@@ -352,6 +355,7 @@ class TungstenAggregationIterator(
private def processInputs(): Unit = {
while (!sortBased && inputIter.hasNext) {
val newInput = inputIter.next()
+ numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
if (buffer == null) {
@@ -371,6 +375,7 @@ class TungstenAggregationIterator(
var i = 0
while (!sortBased && inputIter.hasNext) {
val newInput = inputIter.next()
+ numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
val buffer: UnsafeRow = if (i < fallbackStartsAt) {
hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
@@ -439,6 +444,7 @@ class TungstenAggregationIterator(
// Process the rest of input rows.
while (inputIter.hasNext) {
val newInput = inputIter.next()
+ numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
buffer.copyFrom(initialAggregationBuffer)
processRow(buffer, newInput)
@@ -462,6 +468,7 @@ class TungstenAggregationIterator(
// Insert the rest of input rows.
while (inputIter.hasNext) {
val newInput = inputIter.next()
+ numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
bufferExtractor(newInput)
externalSorter.insertKV(groupingKey, buffer)
@@ -657,7 +664,7 @@ class TungstenAggregationIterator(
TaskContext.get().internalMetricsToAccumulators(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemory)
}
-
+ numOutputRows += 1
res
} else {
// no more result
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index bf2de244c8..247c900baa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -41,11 +41,20 @@ import org.apache.spark.{HashPartitioner, SparkEnv}
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+ override private[sql] lazy val metrics = Map(
+ "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
+
@transient lazy val buildProjection = newMutableProjection(projectList, child.output)
- protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
- val reusableProjection = buildProjection()
- iter.map(reusableProjection)
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numRows = longMetric("numRows")
+ child.execute().mapPartitions { iter =>
+ val reusableProjection = buildProjection()
+ iter.map { row =>
+ numRows += 1
+ reusableProjection(row)
+ }
+ }
}
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
@@ -57,19 +66,28 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
*/
case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
+ override private[sql] lazy val metrics = Map(
+ "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
+
override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
- protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
- this.transformAllExpressions {
- case CreateStruct(children) => CreateStructUnsafe(children)
- case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numRows = longMetric("numRows")
+ child.execute().mapPartitions { iter =>
+ this.transformAllExpressions {
+ case CreateStruct(children) => CreateStructUnsafe(children)
+ case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
+ }
+ val project = UnsafeProjection.create(projectList, child.output)
+ iter.map { row =>
+ numRows += 1
+ project(row)
+ }
}
- val project = UnsafeProjection.create(projectList, child.output)
- iter.map(project)
}
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index f7a68e4f5d..2e108cb814 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.ThreadUtils
import org.apache.spark.{InternalAccumulator, TaskContext}
@@ -45,7 +46,10 @@ case class BroadcastHashJoin(
right: SparkPlan)
extends BinaryNode with HashJoin {
- override protected[sql] val trackNumOfRowsEnabled = true
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
val timeout: Duration = {
val timeoutValue = sqlContext.conf.broadcastTimeout
@@ -65,6 +69,11 @@ case class BroadcastHashJoin(
// for the same query.
@transient
private lazy val broadcastFuture = {
+ val numBuildRows = buildSide match {
+ case BuildLeft => longMetric("numLeftRows")
+ case BuildRight => longMetric("numRightRows")
+ }
+
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
future {
@@ -73,8 +82,15 @@ case class BroadcastHashJoin(
SQLExecution.withExecutionId(sparkContext, executionId) {
// Note that we use .execute().collect() because we don't want to convert data to Scala
// types
- val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
- val hashed = HashedRelation(input.iterator, buildSideKeyGenerator, input.size)
+ val input: Array[InternalRow] = buildPlan.execute().map { row =>
+ numBuildRows += 1
+ row.copy()
+ }.collect()
+ // The following line doesn't run in a job so we cannot track the metric value. However, we
+ // have already tracked it in the above lines. So here we can use
+ // `SQLMetrics.nullLongMetric` to ignore it.
+ val hashed = HashedRelation(
+ input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size)
sparkContext.broadcast(hashed)
}
}(BroadcastHashJoin.broadcastHashJoinExecutionContext)
@@ -85,6 +101,12 @@ case class BroadcastHashJoin(
}
protected override def doExecute(): RDD[InternalRow] = {
+ val numStreamedRows = buildSide match {
+ case BuildLeft => longMetric("numRightRows")
+ case BuildRight => longMetric("numLeftRows")
+ }
+ val numOutputRows = longMetric("numOutputRows")
+
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
@@ -95,7 +117,7 @@ case class BroadcastHashJoin(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
case _ =>
}
- hashJoin(streamedIter, hashedRelation)
+ hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index a3626de49a..69a8b95eaa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.{InternalAccumulator, TaskContext}
/**
@@ -45,6 +46,11 @@ case class BroadcastHashOuterJoin(
left: SparkPlan,
right: SparkPlan) extends BinaryNode with HashOuterJoin {
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
val timeout = {
val timeoutValue = sqlContext.conf.broadcastTimeout
if (timeoutValue < 0) {
@@ -63,6 +69,14 @@ case class BroadcastHashOuterJoin(
// for the same query.
@transient
private lazy val broadcastFuture = {
+ val numBuildRows = joinType match {
+ case RightOuter => longMetric("numLeftRows")
+ case LeftOuter => longMetric("numRightRows")
+ case x =>
+ throw new IllegalArgumentException(
+ s"HashOuterJoin should not take $x as the JoinType")
+ }
+
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
future {
@@ -71,8 +85,15 @@ case class BroadcastHashOuterJoin(
SQLExecution.withExecutionId(sparkContext, executionId) {
// Note that we use .execute().collect() because we don't want to convert data to Scala
// types
- val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
- val hashed = HashedRelation(input.iterator, buildKeyGenerator, input.size)
+ val input: Array[InternalRow] = buildPlan.execute().map { row =>
+ numBuildRows += 1
+ row.copy()
+ }.collect()
+ // The following line doesn't run in a job so we cannot track the metric value. However, we
+ // have already tracked it in the above lines. So here we can use
+ // `SQLMetrics.nullLongMetric` to ignore it.
+ val hashed = HashedRelation(
+ input.iterator, SQLMetrics.nullLongMetric, buildKeyGenerator, input.size)
sparkContext.broadcast(hashed)
}
}(BroadcastHashJoin.broadcastHashJoinExecutionContext)
@@ -83,6 +104,15 @@ case class BroadcastHashOuterJoin(
}
override def doExecute(): RDD[InternalRow] = {
+ val numStreamedRows = joinType match {
+ case RightOuter => longMetric("numRightRows")
+ case LeftOuter => longMetric("numLeftRows")
+ case x =>
+ throw new IllegalArgumentException(
+ s"HashOuterJoin should not take $x as the JoinType")
+ }
+ val numOutputRows = longMetric("numOutputRows")
+
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
@@ -101,16 +131,18 @@ case class BroadcastHashOuterJoin(
joinType match {
case LeftOuter =>
streamedIter.flatMap(currentRow => {
+ numStreamedRows += 1
val rowKey = keyGenerator(currentRow)
joinedRow.withLeft(currentRow)
- leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj)
+ leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj, numOutputRows)
})
case RightOuter =>
streamedIter.flatMap(currentRow => {
+ numStreamedRows += 1
val rowKey = keyGenerator(currentRow)
joinedRow.withRight(currentRow)
- rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj)
+ rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj, numOutputRows)
})
case x =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index 5bd06fbdca..78a8c16c62 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -37,18 +38,31 @@ case class BroadcastLeftSemiJoinHash(
right: SparkPlan,
condition: Option[Expression]) extends BinaryNode with HashSemiJoin {
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
protected override def doExecute(): RDD[InternalRow] = {
- val input = right.execute().map(_.copy()).collect()
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
+ val input = right.execute().map { row =>
+ numRightRows += 1
+ row.copy()
+ }.collect()
if (condition.isEmpty) {
- val hashSet = buildKeyHashSet(input.toIterator)
+ val hashSet = buildKeyHashSet(input.toIterator, SQLMetrics.nullLongMetric)
val broadcastedRelation = sparkContext.broadcast(hashSet)
left.execute().mapPartitions { streamIter =>
- hashSemiJoin(streamIter, broadcastedRelation.value)
+ hashSemiJoin(streamIter, numLeftRows, broadcastedRelation.value, numOutputRows)
}
} else {
- val hashRelation = HashedRelation(input.toIterator, rightKeyGenerator, input.size)
+ val hashRelation =
+ HashedRelation(input.toIterator, SQLMetrics.nullLongMetric, rightKeyGenerator, input.size)
val broadcastedRelation = sparkContext.broadcast(hashRelation)
left.execute().mapPartitions { streamIter =>
@@ -59,7 +73,7 @@ case class BroadcastLeftSemiJoinHash(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
case _ =>
}
- hashSemiJoin(streamIter, hashedRelation)
+ hashSemiJoin(streamIter, numLeftRows, hashedRelation, numOutputRows)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index 017a44b9ca..28c88b1b03 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.collection.CompactBuffer
/**
@@ -38,6 +39,11 @@ case class BroadcastNestedLoopJoin(
condition: Option[Expression]) extends BinaryNode {
// TODO: Override requiredChildDistribution.
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
/** BuildRight means the right relation <=> the broadcast relation. */
private val (streamed, broadcast) = buildSide match {
case BuildRight => (left, right)
@@ -75,9 +81,17 @@ case class BroadcastNestedLoopJoin(
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
protected override def doExecute(): RDD[InternalRow] = {
+ val (numStreamedRows, numBuildRows) = buildSide match {
+ case BuildRight => (longMetric("numLeftRows"), longMetric("numRightRows"))
+ case BuildLeft => (longMetric("numRightRows"), longMetric("numLeftRows"))
+ }
+ val numOutputRows = longMetric("numOutputRows")
+
val broadcastedRelation =
- sparkContext.broadcast(broadcast.execute().map(_.copy())
- .collect().toIndexedSeq)
+ sparkContext.broadcast(broadcast.execute().map { row =>
+ numBuildRows += 1
+ row.copy()
+ }.collect().toIndexedSeq)
/** All rows that either match both-way, or rows from streamed joined with nulls. */
val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter =>
@@ -94,6 +108,7 @@ case class BroadcastNestedLoopJoin(
streamedIter.foreach { streamedRow =>
var i = 0
var streamRowMatched = false
+ numStreamedRows += 1
while (i < broadcastedRelation.value.size) {
val broadcastedRow = broadcastedRelation.value(i)
@@ -162,6 +177,12 @@ case class BroadcastNestedLoopJoin(
// TODO: Breaks lineage.
sparkContext.union(
- matchesOrStreamedRowsWithNulls.flatMap(_._1), sparkContext.makeRDD(broadcastRowsWithNulls))
+ matchesOrStreamedRowsWithNulls.flatMap(_._1),
+ sparkContext.makeRDD(broadcastRowsWithNulls)
+ ).map { row =>
+ // `broadcastRowsWithNulls` doesn't run in a job so that we have to track numOutputRows here.
+ numOutputRows += 1
+ row
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 261b472415..2115f40702 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -22,6 +22,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow}
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -30,13 +31,31 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
protected override def doExecute(): RDD[InternalRow] = {
- val leftResults = left.execute().map(_.copy())
- val rightResults = right.execute().map(_.copy())
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
+ val leftResults = left.execute().map { row =>
+ numLeftRows += 1
+ row.copy()
+ }
+ val rightResults = right.execute().map { row =>
+ numRightRows += 1
+ row.copy()
+ }
leftResults.cartesian(rightResults).mapPartitions { iter =>
val joinedRow = new JoinedRow
- iter.map(r => joinedRow(r._1, r._2))
+ iter.map { r =>
+ numOutputRows += 1
+ joinedRow(r._1, r._2)
+ }
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 22d46d1c3e..7ce4a51783 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.joins
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.LongSQLMetric
trait HashJoin {
@@ -69,7 +70,9 @@ trait HashJoin {
protected def hashJoin(
streamIter: Iterator[InternalRow],
- hashedRelation: HashedRelation): Iterator[InternalRow] =
+ numStreamRows: LongSQLMetric,
+ hashedRelation: HashedRelation,
+ numOutputRows: LongSQLMetric): Iterator[InternalRow] =
{
new Iterator[InternalRow] {
private[this] var currentStreamedRow: InternalRow = _
@@ -98,6 +101,7 @@ trait HashJoin {
case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow)
}
currentMatchPosition += 1
+ numOutputRows += 1
resultProjection(ret)
}
@@ -113,6 +117,7 @@ trait HashJoin {
while (currentHashMatches == null && streamIter.hasNext) {
currentStreamedRow = streamIter.next()
+ numStreamRows += 1
val key = joinKeys(currentStreamedRow)
if (!key.anyNull) {
currentHashMatches = hashedRelation.get(key)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index 701bd3cd86..66903347c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.LongSQLMetric
import org.apache.spark.util.collection.CompactBuffer
@DeveloperApi
@@ -114,22 +115,28 @@ trait HashOuterJoin {
key: InternalRow,
joinedRow: JoinedRow,
rightIter: Iterable[InternalRow],
- resultProjection: InternalRow => InternalRow): Iterator[InternalRow] = {
+ resultProjection: InternalRow => InternalRow,
+ numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
val ret: Iterable[InternalRow] = {
if (!key.anyNull) {
val temp = if (rightIter != null) {
rightIter.collect {
- case r if boundCondition(joinedRow.withRight(r)) => resultProjection(joinedRow).copy()
+ case r if boundCondition(joinedRow.withRight(r)) => {
+ numOutputRows += 1
+ resultProjection(joinedRow).copy()
+ }
}
} else {
List.empty
}
if (temp.isEmpty) {
+ numOutputRows += 1
resultProjection(joinedRow.withRight(rightNullRow)) :: Nil
} else {
temp
}
} else {
+ numOutputRows += 1
resultProjection(joinedRow.withRight(rightNullRow)) :: Nil
}
}
@@ -140,22 +147,28 @@ trait HashOuterJoin {
key: InternalRow,
leftIter: Iterable[InternalRow],
joinedRow: JoinedRow,
- resultProjection: InternalRow => InternalRow): Iterator[InternalRow] = {
+ resultProjection: InternalRow => InternalRow,
+ numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
val ret: Iterable[InternalRow] = {
if (!key.anyNull) {
val temp = if (leftIter != null) {
leftIter.collect {
- case l if boundCondition(joinedRow.withLeft(l)) => resultProjection(joinedRow).copy()
+ case l if boundCondition(joinedRow.withLeft(l)) => {
+ numOutputRows += 1
+ resultProjection(joinedRow).copy()
+ }
}
} else {
List.empty
}
if (temp.isEmpty) {
+ numOutputRows += 1
resultProjection(joinedRow.withLeft(leftNullRow)) :: Nil
} else {
temp
}
} else {
+ numOutputRows += 1
resultProjection(joinedRow.withLeft(leftNullRow)) :: Nil
}
}
@@ -164,7 +177,7 @@ trait HashOuterJoin {
protected[this] def fullOuterIterator(
key: InternalRow, leftIter: Iterable[InternalRow], rightIter: Iterable[InternalRow],
- joinedRow: JoinedRow): Iterator[InternalRow] = {
+ joinedRow: JoinedRow, numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
if (!key.anyNull) {
// Store the positions of records in right, if one of its associated row satisfy
// the join condition.
@@ -177,6 +190,7 @@ trait HashOuterJoin {
// append them directly
case (r, idx) if boundCondition(joinedRow.withRight(r)) =>
+ numOutputRows += 1
matched = true
// if the row satisfy the join condition, add its index into the matched set
rightMatchedSet.add(idx)
@@ -189,6 +203,7 @@ trait HashOuterJoin {
// as we don't know whether we need to append it until finish iterating all
// of the records in right side.
// If we didn't get any proper row, then append a single row with empty right.
+ numOutputRows += 1
joinedRow.withRight(rightNullRow).copy()
})
} ++ rightIter.zipWithIndex.collect {
@@ -197,12 +212,15 @@ trait HashOuterJoin {
// Re-visiting the records in right, and append additional row with empty left, if its not
// in the matched set.
case (r, idx) if !rightMatchedSet.contains(idx) =>
+ numOutputRows += 1
joinedRow(leftNullRow, r).copy()
}
} else {
leftIter.iterator.map[InternalRow] { l =>
+ numOutputRows += 1
joinedRow(l, rightNullRow).copy()
} ++ rightIter.iterator.map[InternalRow] { r =>
+ numOutputRows += 1
joinedRow(leftNullRow, r).copy()
}
}
@@ -211,10 +229,12 @@ trait HashOuterJoin {
// This is only used by FullOuter
protected[this] def buildHashTable(
iter: Iterator[InternalRow],
+ numIterRows: LongSQLMetric,
keyGenerator: Projection): JavaHashMap[InternalRow, CompactBuffer[InternalRow]] = {
val hashTable = new JavaHashMap[InternalRow, CompactBuffer[InternalRow]]()
while (iter.hasNext) {
val currentRow = iter.next()
+ numIterRows += 1
val rowKey = keyGenerator(currentRow)
var existingMatchList = hashTable.get(rowKey)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index 82dd6eb7e7..beb141ade6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.joins
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.LongSQLMetric
trait HashSemiJoin {
@@ -61,13 +62,15 @@ trait HashSemiJoin {
@transient private lazy val boundCondition =
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
- protected def buildKeyHashSet(buildIter: Iterator[InternalRow]): java.util.Set[InternalRow] = {
+ protected def buildKeyHashSet(
+ buildIter: Iterator[InternalRow], numBuildRows: LongSQLMetric): java.util.Set[InternalRow] = {
val hashSet = new java.util.HashSet[InternalRow]()
// Create a Hash set of buildKeys
val rightKey = rightKeyGenerator
while (buildIter.hasNext) {
val currentRow = buildIter.next()
+ numBuildRows += 1
val rowKey = rightKey(currentRow)
if (!rowKey.anyNull) {
val keyExists = hashSet.contains(rowKey)
@@ -82,25 +85,35 @@ trait HashSemiJoin {
protected def hashSemiJoin(
streamIter: Iterator[InternalRow],
- hashSet: java.util.Set[InternalRow]): Iterator[InternalRow] = {
+ numStreamRows: LongSQLMetric,
+ hashSet: java.util.Set[InternalRow],
+ numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
val joinKeys = leftKeyGenerator
streamIter.filter(current => {
+ numStreamRows += 1
val key = joinKeys(current)
- !key.anyNull && hashSet.contains(key)
+ val r = !key.anyNull && hashSet.contains(key)
+ if (r) numOutputRows += 1
+ r
})
}
protected def hashSemiJoin(
streamIter: Iterator[InternalRow],
- hashedRelation: HashedRelation): Iterator[InternalRow] = {
+ numStreamRows: LongSQLMetric,
+ hashedRelation: HashedRelation,
+ numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
val joinKeys = leftKeyGenerator
val joinedRow = new JoinedRow
streamIter.filter { current =>
+ numStreamRows += 1
val key = joinKeys(current)
lazy val rowBuffer = hashedRelation.get(key)
- !key.anyNull && rowBuffer != null && rowBuffer.exists {
+ val r = !key.anyNull && rowBuffer != null && rowBuffer.exists {
(row: InternalRow) => boundCondition(joinedRow(current, row))
}
+ if (r) numOutputRows += 1
+ r
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 63d35d0f02..c1bc7947aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -25,6 +25,7 @@ import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkSqlSerializer
+import org.apache.spark.sql.execution.metric.LongSQLMetric
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager}
@@ -112,11 +113,13 @@ private[joins] object HashedRelation {
def apply(
input: Iterator[InternalRow],
+ numInputRows: LongSQLMetric,
keyGenerator: Projection,
sizeEstimate: Int = 64): HashedRelation = {
if (keyGenerator.isInstanceOf[UnsafeProjection]) {
- return UnsafeHashedRelation(input, keyGenerator.asInstanceOf[UnsafeProjection], sizeEstimate)
+ return UnsafeHashedRelation(
+ input, numInputRows, keyGenerator.asInstanceOf[UnsafeProjection], sizeEstimate)
}
// TODO: Use Spark's HashMap implementation.
@@ -130,6 +133,7 @@ private[joins] object HashedRelation {
// Create a mapping of buildKeys -> rows
while (input.hasNext) {
currentRow = input.next()
+ numInputRows += 1
val rowKey = keyGenerator(currentRow)
if (!rowKey.anyNull) {
val existingMatchList = hashTable.get(rowKey)
@@ -331,6 +335,7 @@ private[joins] object UnsafeHashedRelation {
def apply(
input: Iterator[InternalRow],
+ numInputRows: LongSQLMetric,
keyGenerator: UnsafeProjection,
sizeEstimate: Int): HashedRelation = {
@@ -340,6 +345,7 @@ private[joins] object UnsafeHashedRelation {
// Create a mapping of buildKeys -> rows
while (input.hasNext) {
val unsafeRow = input.next().asInstanceOf[UnsafeRow]
+ numInputRows += 1
val rowKey = keyGenerator(unsafeRow)
if (!rowKey.anyNull) {
val existingMatchList = hashTable.get(rowKey)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
index 4443455ef1..ad6362542f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -35,6 +36,11 @@ case class LeftSemiJoinBNL(
extends BinaryNode {
// TODO: Override requiredChildDistribution.
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def outputPartitioning: Partitioning = streamed.outputPartitioning
override def output: Seq[Attribute] = left.output
@@ -52,13 +58,21 @@ case class LeftSemiJoinBNL(
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
protected override def doExecute(): RDD[InternalRow] = {
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
val broadcastedRelation =
- sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
+ sparkContext.broadcast(broadcast.execute().map { row =>
+ numRightRows += 1
+ row.copy()
+ }.collect().toIndexedSeq)
streamed.execute().mapPartitions { streamedIter =>
val joinedRow = new JoinedRow
streamedIter.filter(streamedRow => {
+ numLeftRows += 1
var i = 0
var matched = false
@@ -69,6 +83,9 @@ case class LeftSemiJoinBNL(
}
i += 1
}
+ if (matched) {
+ numOutputRows += 1
+ }
matched
})
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
index 68ccd34d8e..18808adaac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, Distribution, ClusteredDistribution}
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -37,19 +38,28 @@ case class LeftSemiJoinHash(
right: SparkPlan,
condition: Option[Expression]) extends BinaryNode with HashSemiJoin {
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def outputPartitioning: Partitioning = left.outputPartitioning
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
protected override def doExecute(): RDD[InternalRow] = {
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) =>
if (condition.isEmpty) {
- val hashSet = buildKeyHashSet(buildIter)
- hashSemiJoin(streamIter, hashSet)
+ val hashSet = buildKeyHashSet(buildIter, numRightRows)
+ hashSemiJoin(streamIter, numLeftRows, hashSet, numOutputRows)
} else {
- val hashRelation = HashedRelation(buildIter, rightKeyGenerator)
- hashSemiJoin(streamIter, hashRelation)
+ val hashRelation = HashedRelation(buildIter, numRightRows, rightKeyGenerator)
+ hashSemiJoin(streamIter, numLeftRows, hashRelation, numOutputRows)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
index c923dc837c..fc8c9439a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -38,7 +39,10 @@ case class ShuffledHashJoin(
right: SparkPlan)
extends BinaryNode with HashJoin {
- override protected[sql] val trackNumOfRowsEnabled = true
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
override def outputPartitioning: Partitioning =
PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
@@ -47,9 +51,15 @@ case class ShuffledHashJoin(
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
protected override def doExecute(): RDD[InternalRow] = {
+ val (numBuildRows, numStreamedRows) = buildSide match {
+ case BuildLeft => (longMetric("numLeftRows"), longMetric("numRightRows"))
+ case BuildRight => (longMetric("numRightRows"), longMetric("numLeftRows"))
+ }
+ val numOutputRows = longMetric("numOutputRows")
+
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
- val hashed = HashedRelation(buildIter, buildSideKeyGenerator)
- hashJoin(streamIter, hashed)
+ val hashed = HashedRelation(buildIter, numBuildRows, buildSideKeyGenerator)
+ hashJoin(streamIter, numStreamedRows, hashed, numOutputRows)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
index 6a8c35efca..ed282f98b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* :: DeveloperApi ::
@@ -41,6 +42,11 @@ case class ShuffledHashOuterJoin(
left: SparkPlan,
right: SparkPlan) extends BinaryNode with HashOuterJoin {
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
@@ -53,39 +59,48 @@ case class ShuffledHashOuterJoin(
}
protected override def doExecute(): RDD[InternalRow] = {
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
val joinedRow = new JoinedRow()
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
// TODO this probably can be replaced by external sort (sort merged join?)
joinType match {
case LeftOuter =>
- val hashed = HashedRelation(rightIter, buildKeyGenerator)
+ val hashed = HashedRelation(rightIter, numRightRows, buildKeyGenerator)
val keyGenerator = streamedKeyGenerator
val resultProj = resultProjection
leftIter.flatMap( currentRow => {
+ numLeftRows += 1
val rowKey = keyGenerator(currentRow)
joinedRow.withLeft(currentRow)
- leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey), resultProj)
+ leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey), resultProj, numOutputRows)
})
case RightOuter =>
- val hashed = HashedRelation(leftIter, buildKeyGenerator)
+ val hashed = HashedRelation(leftIter, numLeftRows, buildKeyGenerator)
val keyGenerator = streamedKeyGenerator
val resultProj = resultProjection
rightIter.flatMap ( currentRow => {
+ numRightRows += 1
val rowKey = keyGenerator(currentRow)
joinedRow.withRight(currentRow)
- rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow, resultProj)
+ rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow, resultProj, numOutputRows)
})
case FullOuter =>
// TODO(davies): use UnsafeRow
- val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, left.output))
- val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output))
+ val leftHashTable =
+ buildHashTable(leftIter, numLeftRows, newProjection(leftKeys, left.output))
+ val rightHashTable =
+ buildHashTable(rightIter, numRightRows, newProjection(rightKeys, right.output))
(leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key =>
fullOuterIterator(key,
leftHashTable.getOrElse(key, EMPTY_LIST),
rightHashTable.getOrElse(key, EMPTY_LIST),
- joinedRow)
+ joinedRow,
+ numOutputRows)
}
case x =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 6d656ea284..6b7322671d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryNode, RowIterator, SparkPlan}
+import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
/**
* :: DeveloperApi ::
@@ -37,6 +38,11 @@ case class SortMergeJoin(
left: SparkPlan,
right: SparkPlan) extends BinaryNode {
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def output: Seq[Attribute] = left.output ++ right.output
override def outputPartitioning: Partitioning =
@@ -70,6 +76,10 @@ case class SortMergeJoin(
}
protected override def doExecute(): RDD[InternalRow] = {
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
new RowIterator {
// An ordering that can be used to compare keys from both sides.
@@ -82,7 +92,9 @@ case class SortMergeJoin(
rightKeyGenerator,
keyOrdering,
RowIterator.fromScala(leftIter),
- RowIterator.fromScala(rightIter)
+ numLeftRows,
+ RowIterator.fromScala(rightIter),
+ numRightRows
)
private[this] val joinRow = new JoinedRow
private[this] val resultProjection: (InternalRow) => InternalRow = {
@@ -108,6 +120,7 @@ case class SortMergeJoin(
if (currentLeftRow != null) {
joinRow(currentLeftRow, currentRightMatches(currentMatchIdx))
currentMatchIdx += 1
+ numOutputRows += 1
true
} else {
false
@@ -144,7 +157,9 @@ private[joins] class SortMergeJoinScanner(
bufferedKeyGenerator: Projection,
keyOrdering: Ordering[InternalRow],
streamedIter: RowIterator,
- bufferedIter: RowIterator) {
+ numStreamedRows: LongSQLMetric,
+ bufferedIter: RowIterator,
+ numBufferedRows: LongSQLMetric) {
private[this] var streamedRow: InternalRow = _
private[this] var streamedRowKey: InternalRow = _
private[this] var bufferedRow: InternalRow = _
@@ -269,6 +284,7 @@ private[joins] class SortMergeJoinScanner(
if (streamedIter.advanceNext()) {
streamedRow = streamedIter.getRow
streamedRowKey = streamedKeyGenerator(streamedRow)
+ numStreamedRows += 1
true
} else {
streamedRow = null
@@ -286,6 +302,7 @@ private[joins] class SortMergeJoinScanner(
while (!foundRow && bufferedIter.advanceNext()) {
bufferedRow = bufferedIter.getRow
bufferedRowKey = bufferedKeyGenerator(bufferedRow)
+ numBufferedRows += 1
foundRow = !bufferedRowKey.anyNull
}
if (!foundRow) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
index 5326966b07..dea9e5e580 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryNode, RowIterator, SparkPlan}
+import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
/**
* :: DeveloperApi ::
@@ -40,6 +41,11 @@ case class SortMergeOuterJoin(
left: SparkPlan,
right: SparkPlan) extends BinaryNode {
+ override private[sql] lazy val metrics = Map(
+ "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
+ "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
override def output: Seq[Attribute] = {
joinType match {
case LeftOuter =>
@@ -108,6 +114,10 @@ case class SortMergeOuterJoin(
}
override def doExecute(): RDD[InternalRow] = {
+ val numLeftRows = longMetric("numLeftRows")
+ val numRightRows = longMetric("numRightRows")
+ val numOutputRows = longMetric("numOutputRows")
+
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
// An ordering that can be used to compare keys from both sides.
val keyOrdering = newNaturalAscendingOrdering(leftKeys.map(_.dataType))
@@ -133,10 +143,13 @@ case class SortMergeOuterJoin(
bufferedKeyGenerator = createRightKeyGenerator(),
keyOrdering,
streamedIter = RowIterator.fromScala(leftIter),
- bufferedIter = RowIterator.fromScala(rightIter)
+ numLeftRows,
+ bufferedIter = RowIterator.fromScala(rightIter),
+ numRightRows
)
val rightNullRow = new GenericInternalRow(right.output.length)
- new LeftOuterIterator(smjScanner, rightNullRow, boundCondition, resultProj).toScala
+ new LeftOuterIterator(
+ smjScanner, rightNullRow, boundCondition, resultProj, numOutputRows).toScala
case RightOuter =>
val smjScanner = new SortMergeJoinScanner(
@@ -144,10 +157,13 @@ case class SortMergeOuterJoin(
bufferedKeyGenerator = createLeftKeyGenerator(),
keyOrdering,
streamedIter = RowIterator.fromScala(rightIter),
- bufferedIter = RowIterator.fromScala(leftIter)
+ numRightRows,
+ bufferedIter = RowIterator.fromScala(leftIter),
+ numLeftRows
)
val leftNullRow = new GenericInternalRow(left.output.length)
- new RightOuterIterator(smjScanner, leftNullRow, boundCondition, resultProj).toScala
+ new RightOuterIterator(
+ smjScanner, leftNullRow, boundCondition, resultProj, numOutputRows).toScala
case x =>
throw new IllegalArgumentException(
@@ -162,7 +178,8 @@ private class LeftOuterIterator(
smjScanner: SortMergeJoinScanner,
rightNullRow: InternalRow,
boundCondition: InternalRow => Boolean,
- resultProj: InternalRow => InternalRow
+ resultProj: InternalRow => InternalRow,
+ numRows: LongSQLMetric
) extends RowIterator {
private[this] val joinedRow: JoinedRow = new JoinedRow()
private[this] var rightIdx: Int = 0
@@ -198,7 +215,9 @@ private class LeftOuterIterator(
}
override def advanceNext(): Boolean = {
- advanceRightUntilBoundConditionSatisfied() || advanceLeft()
+ val r = advanceRightUntilBoundConditionSatisfied() || advanceLeft()
+ if (r) numRows += 1
+ r
}
override def getRow: InternalRow = resultProj(joinedRow)
@@ -208,7 +227,8 @@ private class RightOuterIterator(
smjScanner: SortMergeJoinScanner,
leftNullRow: InternalRow,
boundCondition: InternalRow => Boolean,
- resultProj: InternalRow => InternalRow
+ resultProj: InternalRow => InternalRow,
+ numRows: LongSQLMetric
) extends RowIterator {
private[this] val joinedRow: JoinedRow = new JoinedRow()
private[this] var leftIdx: Int = 0
@@ -244,7 +264,9 @@ private class RightOuterIterator(
}
override def advanceNext(): Boolean = {
- advanceLeftUntilBoundConditionSatisfied() || advanceRight()
+ val r = advanceLeftUntilBoundConditionSatisfied() || advanceRight()
+ if (r) numRows += 1
+ r
}
override def getRow: InternalRow = resultProj(joinedRow)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 1b51a5e5c8..7a2a98ec18 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -112,4 +112,10 @@ private[sql] object SQLMetrics {
sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}
+
+ /**
+ * A metric that its value will be ignored. Use this one when we need a metric parameter but don't
+ * care about the value.
+ */
+ val nullLongMetric = new LongSQLMetric("null")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 8b1a9b21a9..a1fa2c3864 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -22,6 +22,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream,
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.collection.CompactBuffer
@@ -35,7 +37,8 @@ class HashedRelationSuite extends SparkFunSuite {
test("GeneralHashedRelation") {
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
- val hashed = HashedRelation(data.iterator, keyProjection)
+ val numDataRows = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, "data")
+ val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
assert(hashed.isInstanceOf[GeneralHashedRelation])
assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0)))
@@ -45,11 +48,13 @@ class HashedRelationSuite extends SparkFunSuite {
val data2 = CompactBuffer[InternalRow](data(2))
data2 += data(2)
assert(hashed.get(data(2)) === data2)
+ assert(numDataRows.value.value === data.length)
}
test("UniqueKeyHashedRelation") {
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2))
- val hashed = HashedRelation(data.iterator, keyProjection)
+ val numDataRows = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, "data")
+ val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
assert(hashed.isInstanceOf[UniqueKeyHashedRelation])
assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0)))
@@ -62,17 +67,19 @@ class HashedRelationSuite extends SparkFunSuite {
assert(uniqHashed.getValue(data(1)) === data(1))
assert(uniqHashed.getValue(data(2)) === data(2))
assert(uniqHashed.getValue(InternalRow(10)) === null)
+ assert(numDataRows.value.value === data.length)
}
test("UnsafeHashedRelation") {
val schema = StructType(StructField("a", IntegerType, true) :: Nil)
val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
+ val numDataRows = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, "data")
val toUnsafe = UnsafeProjection.create(schema)
val unsafeData = data.map(toUnsafe(_).copy()).toArray
val buildKey = Seq(BoundReference(0, IntegerType, false))
val keyGenerator = UnsafeProjection.create(buildKey)
- val hashed = UnsafeHashedRelation(unsafeData.iterator, keyGenerator, 1)
+ val hashed = UnsafeHashedRelation(unsafeData.iterator, numDataRows, keyGenerator, 1)
assert(hashed.isInstanceOf[UnsafeHashedRelation])
assert(hashed.get(unsafeData(0)) === CompactBuffer[InternalRow](unsafeData(0)))
@@ -94,5 +101,6 @@ class HashedRelationSuite extends SparkFunSuite {
assert(hashed2.get(unsafeData(1)) === CompactBuffer[InternalRow](unsafeData(1)))
assert(hashed2.get(toUnsafe(InternalRow(10))) === null)
assert(hashed2.get(unsafeData(2)) === data2)
+ assert(numDataRows.value.value === data.length)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 953284c98b..7383d3f8fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -25,15 +25,24 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.ui.SparkPlanGraph
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.{SQLTestUtils, TestSQLContext}
import org.apache.spark.util.Utils
+class SQLMetricsSuite extends SparkFunSuite with SQLTestUtils {
-class SQLMetricsSuite extends SparkFunSuite {
+ override val sqlContext = TestSQLContext
+
+ import sqlContext.implicits._
test("LongSQLMetric should not box Long") {
val l = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, "long")
- val f = () => { l += 1L }
+ val f = () => {
+ l += 1L
+ l.add(1L)
+ }
BoxingFinder.getClassReader(f.getClass).foreach { cl =>
val boxingFinder = new BoxingFinder()
cl.accept(boxingFinder, 0)
@@ -51,6 +60,441 @@ class SQLMetricsSuite extends SparkFunSuite {
assert(boxingFinder.boxingInvokes.nonEmpty, "Found find boxing in this test")
}
}
+
+ /**
+ * Call `df.collect()` and verify if the collected metrics are same as "expectedMetrics".
+ *
+ * @param df `DataFrame` to run
+ * @param expectedNumOfJobs number of jobs that will run
+ * @param expectedMetrics the expected metrics. The format is
+ * `nodeId -> (operatorName, metric name -> metric value)`.
+ */
+ private def testSparkPlanMetrics(
+ df: DataFrame,
+ expectedNumOfJobs: Int,
+ expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
+ val previousExecutionIds = TestSQLContext.listener.executionIdToData.keySet
+ df.collect()
+ TestSQLContext.sparkContext.listenerBus.waitUntilEmpty(10000)
+ val executionIds = TestSQLContext.listener.executionIdToData.keySet.diff(previousExecutionIds)
+ assert(executionIds.size === 1)
+ val executionId = executionIds.head
+ val jobs = TestSQLContext.listener.getExecution(executionId).get.jobs
+ // Use "<=" because there is a race condition that we may miss some jobs
+ // TODO Change it to "=" once we fix the race condition that missing the JobStarted event.
+ assert(jobs.size <= expectedNumOfJobs)
+ if (jobs.size == expectedNumOfJobs) {
+ // If we can track all jobs, check the metric values
+ val metricValues = TestSQLContext.listener.getExecutionMetrics(executionId)
+ val actualMetrics = SparkPlanGraph(df.queryExecution.executedPlan).nodes.filter { node =>
+ expectedMetrics.contains(node.id)
+ }.map { node =>
+ val nodeMetrics = node.metrics.map { metric =>
+ val metricValue = metricValues(metric.accumulatorId)
+ (metric.name, metricValue)
+ }.toMap
+ (node.id, node.name -> nodeMetrics)
+ }.toMap
+ assert(expectedMetrics === actualMetrics)
+ } else {
+ // TODO Remove this "else" once we fix the race condition that missing the JobStarted event.
+ // Since we cannot track all jobs, the metric values could be wrong and we should not check
+ // them.
+ logWarning("Due to a race condition, we miss some jobs and cannot verify the metric values")
+ }
+ }
+
+ test("Project metrics") {
+ withSQLConf(
+ SQLConf.UNSAFE_ENABLED.key -> "false",
+ SQLConf.CODEGEN_ENABLED.key -> "false",
+ SQLConf.TUNGSTEN_ENABLED.key -> "false") {
+ // Assume the execution plan is
+ // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
+ val df = TestData.person.select('name)
+ testSparkPlanMetrics(df, 1, Map(
+ 0L ->("Project", Map(
+ "number of rows" -> 2L)))
+ )
+ }
+ }
+
+ test("TungstenProject metrics") {
+ withSQLConf(
+ SQLConf.UNSAFE_ENABLED.key -> "true",
+ SQLConf.CODEGEN_ENABLED.key -> "true",
+ SQLConf.TUNGSTEN_ENABLED.key -> "true") {
+ // Assume the execution plan is
+ // PhysicalRDD(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = TestData.person.select('name)
+ testSparkPlanMetrics(df, 1, Map(
+ 0L ->("TungstenProject", Map(
+ "number of rows" -> 2L)))
+ )
+ }
+ }
+
+ test("Filter metrics") {
+ // Assume the execution plan is
+ // PhysicalRDD(nodeId = 1) -> Filter(nodeId = 0)
+ val df = TestData.person.filter('age < 25)
+ testSparkPlanMetrics(df, 1, Map(
+ 0L -> ("Filter", Map(
+ "number of input rows" -> 2L,
+ "number of output rows" -> 1L)))
+ )
+ }
+
+ test("Aggregate metrics") {
+ withSQLConf(
+ SQLConf.UNSAFE_ENABLED.key -> "false",
+ SQLConf.CODEGEN_ENABLED.key -> "false",
+ SQLConf.TUNGSTEN_ENABLED.key -> "false") {
+ // Assume the execution plan is
+ // ... -> Aggregate(nodeId = 2) -> TungstenExchange(nodeId = 1) -> Aggregate(nodeId = 0)
+ val df = TestData.testData2.groupBy().count() // 2 partitions
+ testSparkPlanMetrics(df, 1, Map(
+ 2L -> ("Aggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 2L)),
+ 0L -> ("Aggregate", Map(
+ "number of input rows" -> 2L,
+ "number of output rows" -> 1L)))
+ )
+
+ // 2 partitions and each partition contains 2 keys
+ val df2 = TestData.testData2.groupBy('a).count()
+ testSparkPlanMetrics(df2, 1, Map(
+ 2L -> ("Aggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 4L)),
+ 0L -> ("Aggregate", Map(
+ "number of input rows" -> 4L,
+ "number of output rows" -> 3L)))
+ )
+ }
+ }
+
+ test("SortBasedAggregate metrics") {
+ // Because SortBasedAggregate may skip different rows if the number of partitions is different,
+ // this test should use the deterministic number of partitions.
+ withSQLConf(
+ SQLConf.UNSAFE_ENABLED.key -> "false",
+ SQLConf.CODEGEN_ENABLED.key -> "true",
+ SQLConf.TUNGSTEN_ENABLED.key -> "true") {
+ // Assume the execution plan is
+ // ... -> SortBasedAggregate(nodeId = 2) -> TungstenExchange(nodeId = 1) ->
+ // SortBasedAggregate(nodeId = 0)
+ val df = TestData.testData2.groupBy().count() // 2 partitions
+ testSparkPlanMetrics(df, 1, Map(
+ 2L -> ("SortBasedAggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 2L)),
+ 0L -> ("SortBasedAggregate", Map(
+ "number of input rows" -> 2L,
+ "number of output rows" -> 1L)))
+ )
+
+ // Assume the execution plan is
+ // ... -> SortBasedAggregate(nodeId = 3) -> TungstenExchange(nodeId = 2)
+ // -> ExternalSort(nodeId = 1)-> SortBasedAggregate(nodeId = 0)
+ // 2 partitions and each partition contains 2 keys
+ val df2 = TestData.testData2.groupBy('a).count()
+ testSparkPlanMetrics(df2, 1, Map(
+ 3L -> ("SortBasedAggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 4L)),
+ 0L -> ("SortBasedAggregate", Map(
+ "number of input rows" -> 4L,
+ "number of output rows" -> 3L)))
+ )
+ }
+ }
+
+ test("TungstenAggregate metrics") {
+ withSQLConf(
+ SQLConf.UNSAFE_ENABLED.key -> "true",
+ SQLConf.CODEGEN_ENABLED.key -> "true",
+ SQLConf.TUNGSTEN_ENABLED.key -> "true") {
+ // Assume the execution plan is
+ // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)
+ // -> TungstenAggregate(nodeId = 0)
+ val df = TestData.testData2.groupBy().count() // 2 partitions
+ testSparkPlanMetrics(df, 1, Map(
+ 2L -> ("TungstenAggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 2L)),
+ 0L -> ("TungstenAggregate", Map(
+ "number of input rows" -> 2L,
+ "number of output rows" -> 1L)))
+ )
+
+ // 2 partitions and each partition contains 2 keys
+ val df2 = TestData.testData2.groupBy('a).count()
+ testSparkPlanMetrics(df2, 1, Map(
+ 2L -> ("TungstenAggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 4L)),
+ 0L -> ("TungstenAggregate", Map(
+ "number of input rows" -> 4L,
+ "number of output rows" -> 3L)))
+ )
+ }
+ }
+
+ test("SortMergeJoin metrics") {
+ // Because SortMergeJoin may skip different rows if the number of partitions is different, this
+ // test should use the deterministic number of partitions.
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
+ val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+ testDataForJoin.registerTempTable("testDataForJoin")
+ withTempTable("testDataForJoin") {
+ // Assume the execution plan is
+ // ... -> SortMergeJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = sqlContext.sql(
+ "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
+ testSparkPlanMetrics(df, 1, Map(
+ 1L -> ("SortMergeJoin", Map(
+ // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
+ "number of left rows" -> 4L,
+ "number of right rows" -> 2L,
+ "number of output rows" -> 4L)))
+ )
+ }
+ }
+ }
+
+ test("SortMergeOuterJoin metrics") {
+ // Because SortMergeOuterJoin may skip different rows if the number of partitions is different,
+ // this test should use the deterministic number of partitions.
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
+ val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+ testDataForJoin.registerTempTable("testDataForJoin")
+ withTempTable("testDataForJoin") {
+ // Assume the execution plan is
+ // ... -> SortMergeOuterJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = sqlContext.sql(
+ "SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
+ testSparkPlanMetrics(df, 1, Map(
+ 1L -> ("SortMergeOuterJoin", Map(
+ // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
+ "number of left rows" -> 6L,
+ "number of right rows" -> 2L,
+ "number of output rows" -> 8L)))
+ )
+
+ val df2 = sqlContext.sql(
+ "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a")
+ testSparkPlanMetrics(df2, 1, Map(
+ 1L -> ("SortMergeOuterJoin", Map(
+ // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
+ "number of left rows" -> 2L,
+ "number of right rows" -> 6L,
+ "number of output rows" -> 8L)))
+ )
+ }
+ }
+ }
+
+ test("BroadcastHashJoin metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+ val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+ val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key", "value")
+ // Assume the execution plan is
+ // ... -> BroadcastHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = df1.join(broadcast(df2), "key")
+ testSparkPlanMetrics(df, 2, Map(
+ 1L -> ("BroadcastHashJoin", Map(
+ "number of left rows" -> 2L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 2L)))
+ )
+ }
+ }
+
+ test("ShuffledHashJoin metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+ val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+ testDataForJoin.registerTempTable("testDataForJoin")
+ withTempTable("testDataForJoin") {
+ // Assume the execution plan is
+ // ... -> ShuffledHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = sqlContext.sql(
+ "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
+ testSparkPlanMetrics(df, 1, Map(
+ 1L -> ("ShuffledHashJoin", Map(
+ "number of left rows" -> 6L,
+ "number of right rows" -> 2L,
+ "number of output rows" -> 4L)))
+ )
+ }
+ }
+ }
+
+ test("ShuffledHashOuterJoin metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
+ val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
+ val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
+ // Assume the execution plan is
+ // ... -> ShuffledHashOuterJoin(nodeId = 0)
+ val df = df1.join(df2, $"key" === $"key2", "left_outer")
+ testSparkPlanMetrics(df, 1, Map(
+ 0L -> ("ShuffledHashOuterJoin", Map(
+ "number of left rows" -> 3L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 5L)))
+ )
+
+ val df3 = df1.join(df2, $"key" === $"key2", "right_outer")
+ testSparkPlanMetrics(df3, 1, Map(
+ 0L -> ("ShuffledHashOuterJoin", Map(
+ "number of left rows" -> 3L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 6L)))
+ )
+
+ val df4 = df1.join(df2, $"key" === $"key2", "outer")
+ testSparkPlanMetrics(df4, 1, Map(
+ 0L -> ("ShuffledHashOuterJoin", Map(
+ "number of left rows" -> 3L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 7L)))
+ )
+ }
+ }
+
+ test("BroadcastHashOuterJoin metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+ val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
+ val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
+ // Assume the execution plan is
+ // ... -> BroadcastHashOuterJoin(nodeId = 0)
+ val df = df1.join(broadcast(df2), $"key" === $"key2", "left_outer")
+ testSparkPlanMetrics(df, 2, Map(
+ 0L -> ("BroadcastHashOuterJoin", Map(
+ "number of left rows" -> 3L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 5L)))
+ )
+
+ val df3 = df1.join(broadcast(df2), $"key" === $"key2", "right_outer")
+ testSparkPlanMetrics(df3, 2, Map(
+ 0L -> ("BroadcastHashOuterJoin", Map(
+ "number of left rows" -> 3L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 6L)))
+ )
+ }
+ }
+
+ test("BroadcastNestedLoopJoin metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
+ val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+ testDataForJoin.registerTempTable("testDataForJoin")
+ withTempTable("testDataForJoin") {
+ // Assume the execution plan is
+ // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = sqlContext.sql(
+ "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
+ "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a")
+ testSparkPlanMetrics(df, 3, Map(
+ 1L -> ("BroadcastNestedLoopJoin", Map(
+ "number of left rows" -> 12L, // left needs to be scanned twice
+ "number of right rows" -> 2L,
+ "number of output rows" -> 12L)))
+ )
+ }
+ }
+ }
+
+ test("BroadcastLeftSemiJoinHash metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+ val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+ val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
+ // Assume the execution plan is
+ // ... -> BroadcastLeftSemiJoinHash(nodeId = 0)
+ val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
+ testSparkPlanMetrics(df, 2, Map(
+ 0L -> ("BroadcastLeftSemiJoinHash", Map(
+ "number of left rows" -> 2L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 2L)))
+ )
+ }
+ }
+
+ test("LeftSemiJoinHash metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
+ val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+ val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
+ // Assume the execution plan is
+ // ... -> LeftSemiJoinHash(nodeId = 0)
+ val df = df1.join(df2, $"key" === $"key2", "leftsemi")
+ testSparkPlanMetrics(df, 1, Map(
+ 0L -> ("LeftSemiJoinHash", Map(
+ "number of left rows" -> 2L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 2L)))
+ )
+ }
+ }
+
+ test("LeftSemiJoinBNL metrics") {
+ withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+ val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+ val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
+ // Assume the execution plan is
+ // ... -> LeftSemiJoinBNL(nodeId = 0)
+ val df = df1.join(df2, $"key" < $"key2", "leftsemi")
+ testSparkPlanMetrics(df, 2, Map(
+ 0L -> ("LeftSemiJoinBNL", Map(
+ "number of left rows" -> 2L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 2L)))
+ )
+ }
+ }
+
+ test("CartesianProduct metrics") {
+ val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+ testDataForJoin.registerTempTable("testDataForJoin")
+ withTempTable("testDataForJoin") {
+ // Assume the execution plan is
+ // ... -> CartesianProduct(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = sqlContext.sql(
+ "SELECT * FROM testData2 JOIN testDataForJoin")
+ testSparkPlanMetrics(df, 1, Map(
+ 1L -> ("CartesianProduct", Map(
+ "number of left rows" -> 12L, // left needs to be scanned twice
+ "number of right rows" -> 12L, // right is read 6 times
+ "number of output rows" -> 12L)))
+ )
+ }
+ }
+
+ test("save metrics") {
+ withTempPath { file =>
+ val previousExecutionIds = TestSQLContext.listener.executionIdToData.keySet
+ // Assume the execution plan is
+ // PhysicalRDD(nodeId = 0)
+ TestData.person.select('name).write.format("json").save(file.getAbsolutePath)
+ TestSQLContext.sparkContext.listenerBus.waitUntilEmpty(10000)
+ val executionIds = TestSQLContext.listener.executionIdToData.keySet.diff(previousExecutionIds)
+ assert(executionIds.size === 1)
+ val executionId = executionIds.head
+ val jobs = TestSQLContext.listener.getExecution(executionId).get.jobs
+ // Use "<=" because there is a race condition that we may miss some jobs
+ // TODO Change "<=" to "=" once we fix the race condition that missing the JobStarted event.
+ assert(jobs.size <= 1)
+ val metricValues = TestSQLContext.listener.getExecutionMetrics(executionId)
+ // Because "save" will create a new DataFrame internally, we cannot get the real metric id.
+ // However, we still can check the value.
+ assert(metricValues.values.toSeq === Seq(2L))
+ }
+ }
+
}
private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String)