aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-04-22 12:59:32 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-22 12:59:32 -0700
commit0dcf9dbebbd53aaebe17c85ede7ab7847ce83137 (patch)
tree8dd6b2fa65e308bb10294f8ea76cbd0de1ace382
parent0419d63169274ecd60c05c1ef4ce2d4ed3a49605 (diff)
downloadspark-0dcf9dbebbd53aaebe17c85ede7ab7847ce83137.tar.gz
spark-0dcf9dbebbd53aaebe17c85ede7ab7847ce83137.tar.bz2
spark-0dcf9dbebbd53aaebe17c85ede7ab7847ce83137.zip
[SPARK-14669] [SQL] Fix some SQL metrics in codegen and added more
## What changes were proposed in this pull request? 1. Fix the "spill size" of TungstenAggregate and Sort 2. Rename "data size" to "peak memory" to match the actual meaning (also consistent with task metrics) 3. Added "data size" for ShuffleExchange and BroadcastExchange 4. Added some timing for Sort, Aggregate and BroadcastExchange (this requires another patch to work) ## How was this patch tested? Existing tests. ![metrics](https://cloud.githubusercontent.com/assets/40902/14573908/21ad2f00-030d-11e6-9e2c-c544f30039ea.png) Author: Davies Liu <davies@databricks.com> Closes #12425 from davies/fix_metrics.
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java12
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala5
10 files changed, 110 insertions, 32 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 66a77982ad..3c1cd39dc2 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -75,6 +75,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private MemoryBlock currentPage = null;
private long pageCursor = -1;
private long peakMemoryUsedBytes = 0;
+ private long totalSpillBytes = 0L;
private volatile SpillableIterator readingIterator = null;
public static UnsafeExternalSorter createWithExistingInMemorySorter(
@@ -215,7 +216,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// we might not be able to get memory for the pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
-
+ totalSpillBytes += spillSize;
return spillSize;
}
@@ -246,6 +247,13 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
return peakMemoryUsedBytes;
}
+ /**
+ * Return the total number of bytes that has been spilled into disk so far.
+ */
+ public long getSpillSize() {
+ return totalSpillBytes;
+ }
+
@VisibleForTesting
public int getNumberOfAllocatedPages() {
return allocatedPages.size();
@@ -499,6 +507,8 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
released += inMemSorter.getMemoryUsage();
inMemSorter.free();
inMemSorter = null;
+ taskContext.taskMetrics().incMemoryBytesSpilled(released);
+ totalSpillBytes += released;
return released;
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index b6499e35b5..38dbfef76c 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -177,6 +177,13 @@ public final class UnsafeKVExternalSorter {
}
/**
+ * Return the total number of bytes that has been spilled into disk so far.
+ */
+ public long getSpillSize() {
+ return sorter.getSpillSize();
+ }
+
+ /**
* Return the peak memory used so far, in bytes.
*/
public long getPeakMemoryUsedBytes() {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index 6f9baa2d33..04a39a126e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -53,8 +53,8 @@ case class Sort(
private val enableRadixSort = sqlContext.conf.enableRadixSort
override private[sql] lazy val metrics = Map(
- "sortTime" -> SQLMetrics.createLongMetric(sparkContext, "sort time"),
- "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
+ "sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"),
+ "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
def createSorter(): UnsafeExternalRowSorter = {
@@ -86,8 +86,9 @@ case class Sort(
}
protected override def doExecute(): RDD[InternalRow] = {
- val dataSize = longMetric("dataSize")
+ val peakMemory = longMetric("peakMemory")
val spillSize = longMetric("spillSize")
+ val sortTime = longMetric("sortTime")
child.execute().mapPartitionsInternal { iter =>
val sorter = createSorter()
@@ -96,10 +97,12 @@ case class Sort(
// Remember spill data size of this task before execute this operator so that we can
// figure out how many bytes we spilled for this operator.
val spillSizeBefore = metrics.memoryBytesSpilled
+ val beforeSort = System.nanoTime()
val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
- dataSize += sorter.getPeakMemoryUsage
+ sortTime += (System.nanoTime() - beforeSort) / 1000000
+ peakMemory += sorter.getPeakMemoryUsage
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
@@ -145,19 +148,19 @@ case class Sort(
ctx.copyResult = false
val outputRow = ctx.freshName("outputRow")
- val dataSize = metricTerm(ctx, "dataSize")
+ val peakMemory = metricTerm(ctx, "peakMemory")
val spillSize = metricTerm(ctx, "spillSize")
val spillSizeBefore = ctx.freshName("spillSizeBefore")
val startTime = ctx.freshName("startTime")
val sortTime = metricTerm(ctx, "sortTime")
s"""
| if ($needToSort) {
- | $addToSorter();
| long $spillSizeBefore = $metrics.memoryBytesSpilled();
| long $startTime = System.nanoTime();
+ | $addToSorter();
| $sortedIterator = $sorterVariable.sort();
- | $sortTime.add(System.nanoTime() - $startTime);
- | $dataSize.add($sorterVariable.getPeakMemoryUsage());
+ | $sortTime.add((System.nanoTime() - $startTime) / 1000000);
+ | $peakMemory.add($sorterVariable.getPeakMemoryUsage());
| $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
| $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());
| $needToSort = false;
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index a23ebec953..362d0d7a72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -26,6 +26,7 @@ import com.google.common.io.ByteStreams
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.metric.LongSQLMetric
import org.apache.spark.unsafe.Platform
/**
@@ -39,12 +40,17 @@ import org.apache.spark.unsafe.Platform
*
* @param numFields the number of fields in the row being serialized.
*/
-private[sql] class UnsafeRowSerializer(numFields: Int) extends Serializer with Serializable {
- override def newInstance(): SerializerInstance = new UnsafeRowSerializerInstance(numFields)
+private[sql] class UnsafeRowSerializer(
+ numFields: Int,
+ dataSize: LongSQLMetric = null) extends Serializer with Serializable {
+ override def newInstance(): SerializerInstance =
+ new UnsafeRowSerializerInstance(numFields, dataSize)
override private[spark] def supportsRelocationOfSerializedObjects: Boolean = true
}
-private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInstance {
+private class UnsafeRowSerializerInstance(
+ numFields: Int,
+ dataSize: LongSQLMetric) extends SerializerInstance {
/**
* Serializes a stream of UnsafeRows. Within the stream, each record consists of a record
* length (stored as a 4-byte integer, written high byte first), followed by the record's bytes.
@@ -54,9 +60,14 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
private[this] val dOut: DataOutputStream =
new DataOutputStream(new BufferedOutputStream(out))
+ // LongSQLMetricParam.add() is faster than LongSQLMetric.+=
+ val localDataSize = if (dataSize != null) dataSize.localValue else null
+
override def writeValue[T: ClassTag](value: T): SerializationStream = {
val row = value.asInstanceOf[UnsafeRow]
-
+ if (localDataSize != null) {
+ localDataSize.add(row.getSizeInBytes)
+ }
dOut.writeInt(row.getSizeInBytes)
row.writeToStream(dOut, writeBuffer)
this
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 5c0fc02861..49b682a951 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
import org.apache.spark.unsafe.KVIterator
@@ -52,8 +52,9 @@ case class TungstenAggregate(
override private[sql] lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
- "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
- "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
+ "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
+ "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
+ "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"))
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
@@ -83,7 +84,7 @@ case class TungstenAggregate(
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
val numOutputRows = longMetric("numOutputRows")
- val dataSize = longMetric("dataSize")
+ val peakMemory = longMetric("peakMemory")
val spillSize = longMetric("spillSize")
child.execute().mapPartitions { iter =>
@@ -107,7 +108,7 @@ case class TungstenAggregate(
iter,
testFallbackStartsAt,
numOutputRows,
- dataSize,
+ peakMemory,
spillSize)
if (!hasInput && groupingExpressions.isEmpty) {
numOutputRows += 1
@@ -212,10 +213,14 @@ case class TungstenAggregate(
""".stripMargin)
val numOutput = metricTerm(ctx, "numOutputRows")
+ val aggTime = metricTerm(ctx, "aggTime")
+ val beforeAgg = ctx.freshName("beforeAgg")
s"""
| while (!$initAgg) {
| $initAgg = true;
+ | long $beforeAgg = System.nanoTime();
| $doAgg();
+ | $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
|
| // output the result
| ${genResult.trim}
@@ -303,15 +308,17 @@ case class TungstenAggregate(
*/
def finishAggregate(
hashMap: UnsafeFixedWidthAggregationMap,
- sorter: UnsafeKVExternalSorter): KVIterator[UnsafeRow, UnsafeRow] = {
+ sorter: UnsafeKVExternalSorter,
+ peakMemory: LongSQLMetricValue,
+ spillSize: LongSQLMetricValue): KVIterator[UnsafeRow, UnsafeRow] = {
// update peak execution memory
val mapMemory = hashMap.getPeakMemoryUsedBytes
val sorterMemory = Option(sorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L)
- val peakMemory = Math.max(mapMemory, sorterMemory)
+ val maxMemory = Math.max(mapMemory, sorterMemory)
val metrics = TaskContext.get().taskMetrics()
- metrics.incPeakExecutionMemory(peakMemory)
- // TODO: update data size and spill size
+ peakMemory.add(maxMemory)
+ metrics.incPeakExecutionMemory(maxMemory)
if (sorter == null) {
// not spilled
@@ -365,6 +372,7 @@ case class TungstenAggregate(
true
} else {
+ spillSize.add(sorter.getSpillSize)
false
}
}
@@ -476,6 +484,8 @@ case class TungstenAggregate(
ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm, "")
val doAgg = ctx.freshName("doAggregateWithKeys")
+ val peakMemory = metricTerm(ctx, "peakMemory")
+ val spillSize = metricTerm(ctx, "spillSize")
ctx.addNewFunction(doAgg,
s"""
${if (isVectorizedHashMapEnabled) vectorizedHashMapGenerator.generate() else ""}
@@ -486,7 +496,7 @@ case class TungstenAggregate(
${if (isVectorizedHashMapEnabled) {
s"$iterTermForVectorizedHashMap = $vectorizedHashMapTerm.rowIterator();"} else ""}
- $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm);
+ $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm, $peakMemory, $spillSize);
}
""")
@@ -528,10 +538,14 @@ case class TungstenAggregate(
} else None
}
+ val aggTime = metricTerm(ctx, "aggTime")
+ val beforeAgg = ctx.freshName("beforeAgg")
s"""
if (!$initAgg) {
$initAgg = true;
+ long $beforeAgg = System.nanoTime();
$doAgg();
+ $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
}
// output the result
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index c368726610..9db5087fe0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -87,7 +87,7 @@ class TungstenAggregationIterator(
inputIter: Iterator[InternalRow],
testFallbackStartsAt: Option[(Int, Int)],
numOutputRows: LongSQLMetric,
- dataSize: LongSQLMetric,
+ peakMemory: LongSQLMetric,
spillSize: LongSQLMetric)
extends AggregationIterator(
groupingExpressions,
@@ -415,11 +415,11 @@ class TungstenAggregationIterator(
if (!hasNext) {
val mapMemory = hashMap.getPeakMemoryUsedBytes
val sorterMemory = Option(externalSorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L)
- val peakMemory = Math.max(mapMemory, sorterMemory)
+ val maxMemory = Math.max(mapMemory, sorterMemory)
val metrics = TaskContext.get().taskMetrics()
- dataSize += peakMemory
+ peakMemory += maxMemory
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
- metrics.incPeakExecutionMemory(peakMemory)
+ metrics.incPeakExecutionMemory(maxMemory)
}
numOutputRows += 1
res
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
index a4f4213342..87a113ee1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
@@ -23,8 +23,10 @@ import scala.concurrent.duration._
import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.ThreadUtils
/**
@@ -35,6 +37,12 @@ case class BroadcastExchange(
mode: BroadcastMode,
child: SparkPlan) extends Exchange {
+ override private[sql] lazy val metrics = Map(
+ "dataSize" -> SQLMetrics.createLongMetric(sparkContext, "data size (bytes)"),
+ "collectTime" -> SQLMetrics.createLongMetric(sparkContext, "time to collect (ms)"),
+ "buildTime" -> SQLMetrics.createLongMetric(sparkContext, "time to build (ms)"),
+ "broadcastTime" -> SQLMetrics.createLongMetric(sparkContext, "time to broadcast (ms)"))
+
override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
override def sameResult(plan: SparkPlan): Boolean = plan match {
@@ -61,11 +69,21 @@ case class BroadcastExchange(
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sparkContext, executionId) {
+ val beforeCollect = System.nanoTime()
// Note that we use .executeCollect() because we don't want to convert data to Scala types
val input: Array[InternalRow] = child.executeCollect()
+ val beforeBuild = System.nanoTime()
+ longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
+ longMetric("dataSize") += input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
// Construct and broadcast the relation.
- sparkContext.broadcast(mode.transform(input))
+ val relation = mode.transform(input)
+ val beforeBroadcast = System.nanoTime()
+ longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
+
+ val broadcasted = sparkContext.broadcast(relation)
+ longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
+ broadcasted
}
}(BroadcastExchange.executionContext)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
index d7deac9337..e18b59f49b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
@@ -25,10 +25,11 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
-import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.MutablePair
/**
@@ -39,6 +40,9 @@ case class ShuffleExchange(
child: SparkPlan,
@transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
+ override private[sql] lazy val metrics = Map(
+ "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
+
override def nodeName: String = {
val extraInfo = coordinator match {
case Some(exchangeCoordinator) if exchangeCoordinator.isEstimated =>
@@ -54,7 +58,8 @@ case class ShuffleExchange(
override def outputPartitioning: Partitioning = newPartitioning
- private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
+ private val serializer: Serializer =
+ new UnsafeRowSerializer(child.output.size, longMetric("dataSize"))
override protected def doPrepare(): Unit = {
// If an ExchangeCoordinator is needed, we register this Exchange operator
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
index f021f3758c..785373b225 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@@ -40,7 +40,9 @@ case class ShuffledHashJoin(
extends BinaryNode with HashJoin {
override private[sql] lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
+ "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
+ "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))
override def outputPartitioning: Partitioning = joinType match {
case Inner => PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
@@ -57,8 +59,13 @@ case class ShuffledHashJoin(
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
+ val buildDataSize = longMetric("buildDataSize")
+ val buildTime = longMetric("buildTime")
+ val start = System.nanoTime()
val context = TaskContext.get()
val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager())
+ buildTime += (System.nanoTime() - start) / 1000000
+ buildDataSize += relation.estimatedSize
// This relation is usually used until the end of task.
context.addTaskCompletionListener(_ => relation.close())
relation
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 930adabc48..5755c00c1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.metric
+import java.text.NumberFormat
+
import org.apache.spark.{Accumulable, AccumulableParam, Accumulators, SparkContext}
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.util.Utils
@@ -119,7 +121,8 @@ private class LongSQLMetricParam(val stringValue: Seq[Long] => String, initialVa
override def zero: LongSQLMetricValue = new LongSQLMetricValue(initialValue)
}
-private object LongSQLMetricParam extends LongSQLMetricParam(_.sum.toString, 0L)
+private object LongSQLMetricParam
+ extends LongSQLMetricParam(x => NumberFormat.getInstance().format(x.sum), 0L)
private object StatisticsBytesSQLMetricParam extends LongSQLMetricParam(
(values: Seq[Long]) => {