aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala5
8 files changed, 92 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index 6f9baa2d33..04a39a126e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -53,8 +53,8 @@ case class Sort(
private val enableRadixSort = sqlContext.conf.enableRadixSort
override private[sql] lazy val metrics = Map(
- "sortTime" -> SQLMetrics.createLongMetric(sparkContext, "sort time"),
- "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
+ "sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"),
+ "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
def createSorter(): UnsafeExternalRowSorter = {
@@ -86,8 +86,9 @@ case class Sort(
}
protected override def doExecute(): RDD[InternalRow] = {
- val dataSize = longMetric("dataSize")
+ val peakMemory = longMetric("peakMemory")
val spillSize = longMetric("spillSize")
+ val sortTime = longMetric("sortTime")
child.execute().mapPartitionsInternal { iter =>
val sorter = createSorter()
@@ -96,10 +97,12 @@ case class Sort(
// Remember spill data size of this task before execute this operator so that we can
// figure out how many bytes we spilled for this operator.
val spillSizeBefore = metrics.memoryBytesSpilled
+ val beforeSort = System.nanoTime()
val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
- dataSize += sorter.getPeakMemoryUsage
+ sortTime += (System.nanoTime() - beforeSort) / 1000000
+ peakMemory += sorter.getPeakMemoryUsage
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
@@ -145,19 +148,19 @@ case class Sort(
ctx.copyResult = false
val outputRow = ctx.freshName("outputRow")
- val dataSize = metricTerm(ctx, "dataSize")
+ val peakMemory = metricTerm(ctx, "peakMemory")
val spillSize = metricTerm(ctx, "spillSize")
val spillSizeBefore = ctx.freshName("spillSizeBefore")
val startTime = ctx.freshName("startTime")
val sortTime = metricTerm(ctx, "sortTime")
s"""
| if ($needToSort) {
- | $addToSorter();
| long $spillSizeBefore = $metrics.memoryBytesSpilled();
| long $startTime = System.nanoTime();
+ | $addToSorter();
| $sortedIterator = $sorterVariable.sort();
- | $sortTime.add(System.nanoTime() - $startTime);
- | $dataSize.add($sorterVariable.getPeakMemoryUsage());
+ | $sortTime.add((System.nanoTime() - $startTime) / 1000000);
+ | $peakMemory.add($sorterVariable.getPeakMemoryUsage());
| $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
| $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());
| $needToSort = false;
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index a23ebec953..362d0d7a72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -26,6 +26,7 @@ import com.google.common.io.ByteStreams
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.metric.LongSQLMetric
import org.apache.spark.unsafe.Platform
/**
@@ -39,12 +40,17 @@ import org.apache.spark.unsafe.Platform
*
* @param numFields the number of fields in the row being serialized.
*/
-private[sql] class UnsafeRowSerializer(numFields: Int) extends Serializer with Serializable {
- override def newInstance(): SerializerInstance = new UnsafeRowSerializerInstance(numFields)
+private[sql] class UnsafeRowSerializer(
+ numFields: Int,
+ dataSize: LongSQLMetric = null) extends Serializer with Serializable {
+ override def newInstance(): SerializerInstance =
+ new UnsafeRowSerializerInstance(numFields, dataSize)
override private[spark] def supportsRelocationOfSerializedObjects: Boolean = true
}
-private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInstance {
+private class UnsafeRowSerializerInstance(
+ numFields: Int,
+ dataSize: LongSQLMetric) extends SerializerInstance {
/**
* Serializes a stream of UnsafeRows. Within the stream, each record consists of a record
* length (stored as a 4-byte integer, written high byte first), followed by the record's bytes.
@@ -54,9 +60,14 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
private[this] val dOut: DataOutputStream =
new DataOutputStream(new BufferedOutputStream(out))
+ // LongSQLMetricParam.add() is faster than LongSQLMetric.+=
+ val localDataSize = if (dataSize != null) dataSize.localValue else null
+
override def writeValue[T: ClassTag](value: T): SerializationStream = {
val row = value.asInstanceOf[UnsafeRow]
-
+ if (localDataSize != null) {
+ localDataSize.add(row.getSizeInBytes)
+ }
dOut.writeInt(row.getSizeInBytes)
row.writeToStream(dOut, writeBuffer)
this
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 5c0fc02861..49b682a951 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
import org.apache.spark.unsafe.KVIterator
@@ -52,8 +52,9 @@ case class TungstenAggregate(
override private[sql] lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
- "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
- "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
+ "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
+ "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
+ "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"))
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
@@ -83,7 +84,7 @@ case class TungstenAggregate(
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
val numOutputRows = longMetric("numOutputRows")
- val dataSize = longMetric("dataSize")
+ val peakMemory = longMetric("peakMemory")
val spillSize = longMetric("spillSize")
child.execute().mapPartitions { iter =>
@@ -107,7 +108,7 @@ case class TungstenAggregate(
iter,
testFallbackStartsAt,
numOutputRows,
- dataSize,
+ peakMemory,
spillSize)
if (!hasInput && groupingExpressions.isEmpty) {
numOutputRows += 1
@@ -212,10 +213,14 @@ case class TungstenAggregate(
""".stripMargin)
val numOutput = metricTerm(ctx, "numOutputRows")
+ val aggTime = metricTerm(ctx, "aggTime")
+ val beforeAgg = ctx.freshName("beforeAgg")
s"""
| while (!$initAgg) {
| $initAgg = true;
+ | long $beforeAgg = System.nanoTime();
| $doAgg();
+ | $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
|
| // output the result
| ${genResult.trim}
@@ -303,15 +308,17 @@ case class TungstenAggregate(
*/
def finishAggregate(
hashMap: UnsafeFixedWidthAggregationMap,
- sorter: UnsafeKVExternalSorter): KVIterator[UnsafeRow, UnsafeRow] = {
+ sorter: UnsafeKVExternalSorter,
+ peakMemory: LongSQLMetricValue,
+ spillSize: LongSQLMetricValue): KVIterator[UnsafeRow, UnsafeRow] = {
// update peak execution memory
val mapMemory = hashMap.getPeakMemoryUsedBytes
val sorterMemory = Option(sorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L)
- val peakMemory = Math.max(mapMemory, sorterMemory)
+ val maxMemory = Math.max(mapMemory, sorterMemory)
val metrics = TaskContext.get().taskMetrics()
- metrics.incPeakExecutionMemory(peakMemory)
- // TODO: update data size and spill size
+ peakMemory.add(maxMemory)
+ metrics.incPeakExecutionMemory(maxMemory)
if (sorter == null) {
// not spilled
@@ -365,6 +372,7 @@ case class TungstenAggregate(
true
} else {
+ spillSize.add(sorter.getSpillSize)
false
}
}
@@ -476,6 +484,8 @@ case class TungstenAggregate(
ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm, "")
val doAgg = ctx.freshName("doAggregateWithKeys")
+ val peakMemory = metricTerm(ctx, "peakMemory")
+ val spillSize = metricTerm(ctx, "spillSize")
ctx.addNewFunction(doAgg,
s"""
${if (isVectorizedHashMapEnabled) vectorizedHashMapGenerator.generate() else ""}
@@ -486,7 +496,7 @@ case class TungstenAggregate(
${if (isVectorizedHashMapEnabled) {
s"$iterTermForVectorizedHashMap = $vectorizedHashMapTerm.rowIterator();"} else ""}
- $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm);
+ $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm, $peakMemory, $spillSize);
}
""")
@@ -528,10 +538,14 @@ case class TungstenAggregate(
} else None
}
+ val aggTime = metricTerm(ctx, "aggTime")
+ val beforeAgg = ctx.freshName("beforeAgg")
s"""
if (!$initAgg) {
$initAgg = true;
+ long $beforeAgg = System.nanoTime();
$doAgg();
+ $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
}
// output the result
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index c368726610..9db5087fe0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -87,7 +87,7 @@ class TungstenAggregationIterator(
inputIter: Iterator[InternalRow],
testFallbackStartsAt: Option[(Int, Int)],
numOutputRows: LongSQLMetric,
- dataSize: LongSQLMetric,
+ peakMemory: LongSQLMetric,
spillSize: LongSQLMetric)
extends AggregationIterator(
groupingExpressions,
@@ -415,11 +415,11 @@ class TungstenAggregationIterator(
if (!hasNext) {
val mapMemory = hashMap.getPeakMemoryUsedBytes
val sorterMemory = Option(externalSorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L)
- val peakMemory = Math.max(mapMemory, sorterMemory)
+ val maxMemory = Math.max(mapMemory, sorterMemory)
val metrics = TaskContext.get().taskMetrics()
- dataSize += peakMemory
+ peakMemory += maxMemory
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
- metrics.incPeakExecutionMemory(peakMemory)
+ metrics.incPeakExecutionMemory(maxMemory)
}
numOutputRows += 1
res
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
index a4f4213342..87a113ee1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
@@ -23,8 +23,10 @@ import scala.concurrent.duration._
import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.ThreadUtils
/**
@@ -35,6 +37,12 @@ case class BroadcastExchange(
mode: BroadcastMode,
child: SparkPlan) extends Exchange {
+ override private[sql] lazy val metrics = Map(
+ "dataSize" -> SQLMetrics.createLongMetric(sparkContext, "data size (bytes)"),
+ "collectTime" -> SQLMetrics.createLongMetric(sparkContext, "time to collect (ms)"),
+ "buildTime" -> SQLMetrics.createLongMetric(sparkContext, "time to build (ms)"),
+ "broadcastTime" -> SQLMetrics.createLongMetric(sparkContext, "time to broadcast (ms)"))
+
override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
override def sameResult(plan: SparkPlan): Boolean = plan match {
@@ -61,11 +69,21 @@ case class BroadcastExchange(
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sparkContext, executionId) {
+ val beforeCollect = System.nanoTime()
// Note that we use .executeCollect() because we don't want to convert data to Scala types
val input: Array[InternalRow] = child.executeCollect()
+ val beforeBuild = System.nanoTime()
+ longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
+ longMetric("dataSize") += input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
// Construct and broadcast the relation.
- sparkContext.broadcast(mode.transform(input))
+ val relation = mode.transform(input)
+ val beforeBroadcast = System.nanoTime()
+ longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
+
+ val broadcasted = sparkContext.broadcast(relation)
+ longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
+ broadcasted
}
}(BroadcastExchange.executionContext)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
index d7deac9337..e18b59f49b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
@@ -25,10 +25,11 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
-import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.MutablePair
/**
@@ -39,6 +40,9 @@ case class ShuffleExchange(
child: SparkPlan,
@transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
+ override private[sql] lazy val metrics = Map(
+ "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
+
override def nodeName: String = {
val extraInfo = coordinator match {
case Some(exchangeCoordinator) if exchangeCoordinator.isEstimated =>
@@ -54,7 +58,8 @@ case class ShuffleExchange(
override def outputPartitioning: Partitioning = newPartitioning
- private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
+ private val serializer: Serializer =
+ new UnsafeRowSerializer(child.output.size, longMetric("dataSize"))
override protected def doPrepare(): Unit = {
// If an ExchangeCoordinator is needed, we register this Exchange operator
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
index f021f3758c..785373b225 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@@ -40,7 +40,9 @@ case class ShuffledHashJoin(
extends BinaryNode with HashJoin {
override private[sql] lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
+ "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
+ "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))
override def outputPartitioning: Partitioning = joinType match {
case Inner => PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
@@ -57,8 +59,13 @@ case class ShuffledHashJoin(
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
+ val buildDataSize = longMetric("buildDataSize")
+ val buildTime = longMetric("buildTime")
+ val start = System.nanoTime()
val context = TaskContext.get()
val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager())
+ buildTime += (System.nanoTime() - start) / 1000000
+ buildDataSize += relation.estimatedSize
// This relation is usually used until the end of task.
context.addTaskCompletionListener(_ => relation.close())
relation
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 930adabc48..5755c00c1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.metric
+import java.text.NumberFormat
+
import org.apache.spark.{Accumulable, AccumulableParam, Accumulators, SparkContext}
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.util.Utils
@@ -119,7 +121,8 @@ private class LongSQLMetricParam(val stringValue: Seq[Long] => String, initialVa
override def zero: LongSQLMetricValue = new LongSQLMetricValue(initialValue)
}
-private object LongSQLMetricParam extends LongSQLMetricParam(_.sum.toString, 0L)
+private object LongSQLMetricParam
+ extends LongSQLMetricParam(x => NumberFormat.getInstance().format(x.sum), 0L)
private object StatisticsBytesSQLMetricParam extends LongSQLMetricParam(
(values: Seq[Long]) => {