diff options
Diffstat (limited to 'sql')
9 files changed, 99 insertions, 31 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index b6499e35b5..38dbfef76c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -177,6 +177,13 @@ public final class UnsafeKVExternalSorter { } /** + * Return the total number of bytes that has been spilled into disk so far. + */ + public long getSpillSize() { + return sorter.getSpillSize(); + } + + /** * Return the peak memory used so far, in bytes. */ public long getPeakMemoryUsedBytes() { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala index 6f9baa2d33..04a39a126e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala @@ -53,8 +53,8 @@ case class Sort( private val enableRadixSort = sqlContext.conf.enableRadixSort override private[sql] lazy val metrics = Map( - "sortTime" -> SQLMetrics.createLongMetric(sparkContext, "sort time"), - "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), + "sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"), + "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) def createSorter(): UnsafeExternalRowSorter = { @@ -86,8 +86,9 @@ case class Sort( } protected override def doExecute(): RDD[InternalRow] = { - val dataSize = longMetric("dataSize") + val peakMemory = longMetric("peakMemory") val spillSize = longMetric("spillSize") + val sortTime = longMetric("sortTime") child.execute().mapPartitionsInternal { iter => val sorter = createSorter() @@ -96,10 +97,12 @@ case class Sort( // Remember spill data size of this task before execute this operator so that we can // figure out how many bytes we spilled for this operator. val spillSizeBefore = metrics.memoryBytesSpilled + val beforeSort = System.nanoTime() val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) - dataSize += sorter.getPeakMemoryUsage + sortTime += (System.nanoTime() - beforeSort) / 1000000 + peakMemory += sorter.getPeakMemoryUsage spillSize += metrics.memoryBytesSpilled - spillSizeBefore metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage) @@ -145,19 +148,19 @@ case class Sort( ctx.copyResult = false val outputRow = ctx.freshName("outputRow") - val dataSize = metricTerm(ctx, "dataSize") + val peakMemory = metricTerm(ctx, "peakMemory") val spillSize = metricTerm(ctx, "spillSize") val spillSizeBefore = ctx.freshName("spillSizeBefore") val startTime = ctx.freshName("startTime") val sortTime = metricTerm(ctx, "sortTime") s""" | if ($needToSort) { - | $addToSorter(); | long $spillSizeBefore = $metrics.memoryBytesSpilled(); | long $startTime = System.nanoTime(); + | $addToSorter(); | $sortedIterator = $sorterVariable.sort(); - | $sortTime.add(System.nanoTime() - $startTime); - | $dataSize.add($sorterVariable.getPeakMemoryUsage()); + | $sortTime.add((System.nanoTime() - $startTime) / 1000000); + | $peakMemory.add($sorterVariable.getPeakMemoryUsage()); | $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore); | $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage()); | $needToSort = false; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala index a23ebec953..362d0d7a72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala @@ -26,6 +26,7 @@ import com.google.common.io.ByteStreams import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.metric.LongSQLMetric import org.apache.spark.unsafe.Platform /** @@ -39,12 +40,17 @@ import org.apache.spark.unsafe.Platform * * @param numFields the number of fields in the row being serialized. */ -private[sql] class UnsafeRowSerializer(numFields: Int) extends Serializer with Serializable { - override def newInstance(): SerializerInstance = new UnsafeRowSerializerInstance(numFields) +private[sql] class UnsafeRowSerializer( + numFields: Int, + dataSize: LongSQLMetric = null) extends Serializer with Serializable { + override def newInstance(): SerializerInstance = + new UnsafeRowSerializerInstance(numFields, dataSize) override private[spark] def supportsRelocationOfSerializedObjects: Boolean = true } -private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInstance { +private class UnsafeRowSerializerInstance( + numFields: Int, + dataSize: LongSQLMetric) extends SerializerInstance { /** * Serializes a stream of UnsafeRows. Within the stream, each record consists of a record * length (stored as a 4-byte integer, written high byte first), followed by the record's bytes. @@ -54,9 +60,14 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst private[this] val dOut: DataOutputStream = new DataOutputStream(new BufferedOutputStream(out)) + // LongSQLMetricParam.add() is faster than LongSQLMetric.+= + val localDataSize = if (dataSize != null) dataSize.localValue else null + override def writeValue[T: ClassTag](value: T): SerializationStream = { val row = value.asInstanceOf[UnsafeRow] - + if (localDataSize != null) { + localDataSize.add(row.getSizeInBytes) + } dOut.writeInt(row.getSizeInBytes) row.writeToStream(dOut, writeBuffer) this diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 5c0fc02861..49b682a951 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics} import org.apache.spark.sql.types.{DecimalType, StringType, StructType} import org.apache.spark.unsafe.KVIterator @@ -52,8 +52,9 @@ case class TungstenAggregate( override private[sql] lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), - "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) + "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time")) override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) @@ -83,7 +84,7 @@ case class TungstenAggregate( protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { val numOutputRows = longMetric("numOutputRows") - val dataSize = longMetric("dataSize") + val peakMemory = longMetric("peakMemory") val spillSize = longMetric("spillSize") child.execute().mapPartitions { iter => @@ -107,7 +108,7 @@ case class TungstenAggregate( iter, testFallbackStartsAt, numOutputRows, - dataSize, + peakMemory, spillSize) if (!hasInput && groupingExpressions.isEmpty) { numOutputRows += 1 @@ -212,10 +213,14 @@ case class TungstenAggregate( """.stripMargin) val numOutput = metricTerm(ctx, "numOutputRows") + val aggTime = metricTerm(ctx, "aggTime") + val beforeAgg = ctx.freshName("beforeAgg") s""" | while (!$initAgg) { | $initAgg = true; + | long $beforeAgg = System.nanoTime(); | $doAgg(); + | $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000); | | // output the result | ${genResult.trim} @@ -303,15 +308,17 @@ case class TungstenAggregate( */ def finishAggregate( hashMap: UnsafeFixedWidthAggregationMap, - sorter: UnsafeKVExternalSorter): KVIterator[UnsafeRow, UnsafeRow] = { + sorter: UnsafeKVExternalSorter, + peakMemory: LongSQLMetricValue, + spillSize: LongSQLMetricValue): KVIterator[UnsafeRow, UnsafeRow] = { // update peak execution memory val mapMemory = hashMap.getPeakMemoryUsedBytes val sorterMemory = Option(sorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L) - val peakMemory = Math.max(mapMemory, sorterMemory) + val maxMemory = Math.max(mapMemory, sorterMemory) val metrics = TaskContext.get().taskMetrics() - metrics.incPeakExecutionMemory(peakMemory) - // TODO: update data size and spill size + peakMemory.add(maxMemory) + metrics.incPeakExecutionMemory(maxMemory) if (sorter == null) { // not spilled @@ -365,6 +372,7 @@ case class TungstenAggregate( true } else { + spillSize.add(sorter.getSpillSize) false } } @@ -476,6 +484,8 @@ case class TungstenAggregate( ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm, "") val doAgg = ctx.freshName("doAggregateWithKeys") + val peakMemory = metricTerm(ctx, "peakMemory") + val spillSize = metricTerm(ctx, "spillSize") ctx.addNewFunction(doAgg, s""" ${if (isVectorizedHashMapEnabled) vectorizedHashMapGenerator.generate() else ""} @@ -486,7 +496,7 @@ case class TungstenAggregate( ${if (isVectorizedHashMapEnabled) { s"$iterTermForVectorizedHashMap = $vectorizedHashMapTerm.rowIterator();"} else ""} - $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm); + $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm, $peakMemory, $spillSize); } """) @@ -528,10 +538,14 @@ case class TungstenAggregate( } else None } + val aggTime = metricTerm(ctx, "aggTime") + val beforeAgg = ctx.freshName("beforeAgg") s""" if (!$initAgg) { $initAgg = true; + long $beforeAgg = System.nanoTime(); $doAgg(); + $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000); } // output the result diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index c368726610..9db5087fe0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -87,7 +87,7 @@ class TungstenAggregationIterator( inputIter: Iterator[InternalRow], testFallbackStartsAt: Option[(Int, Int)], numOutputRows: LongSQLMetric, - dataSize: LongSQLMetric, + peakMemory: LongSQLMetric, spillSize: LongSQLMetric) extends AggregationIterator( groupingExpressions, @@ -415,11 +415,11 @@ class TungstenAggregationIterator( if (!hasNext) { val mapMemory = hashMap.getPeakMemoryUsedBytes val sorterMemory = Option(externalSorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L) - val peakMemory = Math.max(mapMemory, sorterMemory) + val maxMemory = Math.max(mapMemory, sorterMemory) val metrics = TaskContext.get().taskMetrics() - dataSize += peakMemory + peakMemory += maxMemory spillSize += metrics.memoryBytesSpilled - spillSizeBefore - metrics.incPeakExecutionMemory(peakMemory) + metrics.incPeakExecutionMemory(maxMemory) } numOutputRows += 1 res diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala index a4f4213342..87a113ee1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala @@ -23,8 +23,10 @@ import scala.concurrent.duration._ import org.apache.spark.broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.util.ThreadUtils /** @@ -35,6 +37,12 @@ case class BroadcastExchange( mode: BroadcastMode, child: SparkPlan) extends Exchange { + override private[sql] lazy val metrics = Map( + "dataSize" -> SQLMetrics.createLongMetric(sparkContext, "data size (bytes)"), + "collectTime" -> SQLMetrics.createLongMetric(sparkContext, "time to collect (ms)"), + "buildTime" -> SQLMetrics.createLongMetric(sparkContext, "time to build (ms)"), + "broadcastTime" -> SQLMetrics.createLongMetric(sparkContext, "time to broadcast (ms)")) + override def outputPartitioning: Partitioning = BroadcastPartitioning(mode) override def sameResult(plan: SparkPlan): Boolean = plan match { @@ -61,11 +69,21 @@ case class BroadcastExchange( // This will run in another thread. Set the execution id so that we can connect these jobs // with the correct execution. SQLExecution.withExecutionId(sparkContext, executionId) { + val beforeCollect = System.nanoTime() // Note that we use .executeCollect() because we don't want to convert data to Scala types val input: Array[InternalRow] = child.executeCollect() + val beforeBuild = System.nanoTime() + longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000 + longMetric("dataSize") += input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum // Construct and broadcast the relation. - sparkContext.broadcast(mode.transform(input)) + val relation = mode.transform(input) + val beforeBroadcast = System.nanoTime() + longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000 + + val broadcasted = sparkContext.broadcast(relation) + longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000 + broadcasted } }(BroadcastExchange.executionContext) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala index d7deac9337..e18b59f49b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala @@ -25,10 +25,11 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ -import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.util.MutablePair /** @@ -39,6 +40,9 @@ case class ShuffleExchange( child: SparkPlan, @transient coordinator: Option[ExchangeCoordinator]) extends Exchange { + override private[sql] lazy val metrics = Map( + "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")) + override def nodeName: String = { val extraInfo = coordinator match { case Some(exchangeCoordinator) if exchangeCoordinator.isEstimated => @@ -54,7 +58,8 @@ case class ShuffleExchange( override def outputPartitioning: Partitioning = newPartitioning - private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) + private val serializer: Serializer = + new UnsafeRowSerializer(child.output.size, longMetric("dataSize")) override protected def doPrepare(): Unit = { // If an ExchangeCoordinator is needed, we register this Exchange operator diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala index f021f3758c..785373b225 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala @@ -40,7 +40,9 @@ case class ShuffledHashJoin( extends BinaryNode with HashJoin { override private[sql] lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), + "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"), + "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map")) override def outputPartitioning: Partitioning = joinType match { case Inner => PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning)) @@ -57,8 +59,13 @@ case class ShuffledHashJoin( ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = { + val buildDataSize = longMetric("buildDataSize") + val buildTime = longMetric("buildTime") + val start = System.nanoTime() val context = TaskContext.get() val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager()) + buildTime += (System.nanoTime() - start) / 1000000 + buildDataSize += relation.estimatedSize // This relation is usually used until the end of task. context.addTaskCompletionListener(_ => relation.close()) relation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 930adabc48..5755c00c1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.metric +import java.text.NumberFormat + import org.apache.spark.{Accumulable, AccumulableParam, Accumulators, SparkContext} import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.util.Utils @@ -119,7 +121,8 @@ private class LongSQLMetricParam(val stringValue: Seq[Long] => String, initialVa override def zero: LongSQLMetricValue = new LongSQLMetricValue(initialValue) } -private object LongSQLMetricParam extends LongSQLMetricParam(_.sum.toString, 0L) +private object LongSQLMetricParam + extends LongSQLMetricParam(x => NumberFormat.getInstance().format(x.sum), 0L) private object StatisticsBytesSQLMetricParam extends LongSQLMetricParam( (values: Seq[Long]) => { |