diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-01-10 21:40:55 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-01-10 21:40:55 -0800 |
commit | bb8098f203e61111faddf2e1a04b03d62037e6c7 (patch) | |
tree | 9acb60ac506edea5567e2d32565cbed5e35be212 | |
parent | e6447152b323a8fdf71ae3a8c1086ba6948e7512 (diff) | |
download | spark-bb8098f203e61111faddf2e1a04b03d62037e6c7.tar.gz spark-bb8098f203e61111faddf2e1a04b03d62037e6c7.tar.bz2 spark-bb8098f203e61111faddf2e1a04b03d62037e6c7.zip |
Add number of bytes spilled to Web UI
11 files changed, 75 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 8b30cd4bfe..0609b7b154 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -32,9 +32,9 @@ case class Aggregator[K, V, C] ( mergeCombiners: (C, C) => C) { private val sparkConf = SparkEnv.get.conf - private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) + private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", false) - def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { + def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext) : Iterator[(K, C)] = { if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null @@ -53,11 +53,12 @@ case class Aggregator[K, V, C] ( val (k, v) = iter.next() combiners.insert(k, v) } + combiners.registerBytesSpilled(context.attemptId) combiners.iterator } } - def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = { + def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = { if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kc: Product2[K, C] = null @@ -75,6 +76,7 @@ case class Aggregator[K, V, C] ( val (k, c) = iter.next() combiners.insert(k, c) } + combiners.registerBytesSpilled(context.attemptId) combiners.iterator } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 08b592df71..6ec075070d 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -60,6 +60,9 @@ class SparkEnv private[spark] ( // All accesses should be manually synchronized val shuffleMemoryMap = mutable.HashMap[Long, Long]() + // A mapping of task ID to number of bytes spilled by that task. This is mainly for book-keeping. + val bytesSpilledMap = mutable.HashMap[Long, Long]() + private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() // A general, soft-reference map for metadata needed during HadoopRDD split computation diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a7b2328a02..d5dae8902b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -229,6 +229,7 @@ private[spark] class Executor( m.executorRunTime = (taskFinish - taskStart).toInt m.jvmGCTime = gcTime - startGCTime m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt + m.bytesSpilled = env.bytesSpilledMap.get(taskId).getOrElse(0) } val accumUpdates = Accumulators.values @@ -279,11 +280,12 @@ private[spark] class Executor( //System.exit(1) } } finally { - // TODO: Unregister shuffle memory only for ShuffleMapTask + // TODO: Unregister shuffle memory only for ResultTask val shuffleMemoryMap = env.shuffleMemoryMap shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(Thread.currentThread().getId) } + env.bytesSpilledMap.remove(taskId) runningTasks.remove(taskId) } } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index bb1471d9ee..44a15549cb 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -49,6 +49,11 @@ class TaskMetrics extends Serializable { var resultSerializationTime: Long = _ /** + * The number of bytes spilled to disk by this task + */ + var bytesSpilled: Long = _ + + /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here */ var shuffleReadMetrics: Option[ShuffleReadMetrics] = None diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index a73714abca..6df2b3a52d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -106,7 +106,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner = Some(part) override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { - val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) + + val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", false) val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size @@ -150,6 +151,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: map.insert(kv._1, new CoGroupValue(kv._2, depNum)) } } + map.registerBytesSpilled(context.attemptId) new InterruptibleIterator(context, map.iterator) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 1248409e35..dd25d0c6ed 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -88,20 +88,22 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) => { - new InterruptibleIterator(context, aggregator.combineValuesByKey(iter)) + new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else if (mapSideCombine) { - val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) + val combined = self.mapPartitionsWithContext((context, iter) => { + aggregator.combineValuesByKey(iter, context) + }, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) partitioned.mapPartitionsWithContext((context, iter) => { - new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter)) + new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) } else { // Don't apply map-side combiner. val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) => { - new InterruptibleIterator(context, aggregator.combineValuesByKey(iter)) + new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index 3c53e88380..3f9cc97252 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -24,4 +24,5 @@ private[spark] class ExecutorSummary { var succeededTasks : Int = 0 var shuffleRead : Long = 0 var shuffleWrite : Long = 0 + var bytesSpilled : Long = 0 } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 0dd876480a..2522fba8e3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -48,6 +48,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) <th>Succeeded Tasks</th> <th>Shuffle Read</th> <th>Shuffle Write</th> + <th>Bytes Spilled</th> </thead> <tbody> {createExecutorTable()} @@ -80,6 +81,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) <td>{v.succeededTasks}</td> <td>{Utils.bytesToString(v.shuffleRead)}</td> <td>{Utils.bytesToString(v.shuffleWrite)}</td> + <td>{Utils.bytesToString(v.bytesSpilled)}</td> </tr> } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index bcd2824450..f2c8658f98 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -52,6 +52,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val stageIdToTime = HashMap[Int, Long]() val stageIdToShuffleRead = HashMap[Int, Long]() val stageIdToShuffleWrite = HashMap[Int, Long]() + val stageIdToBytesSpilled = HashMap[Int, Long]() val stageIdToTasksActive = HashMap[Int, HashSet[TaskInfo]]() val stageIdToTasksComplete = HashMap[Int, Int]() val stageIdToTasksFailed = HashMap[Int, Int]() @@ -78,6 +79,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageIdToTime.remove(s.stageId) stageIdToShuffleRead.remove(s.stageId) stageIdToShuffleWrite.remove(s.stageId) + stageIdToBytesSpilled.remove(s.stageId) stageIdToTasksActive.remove(s.stageId) stageIdToTasksComplete.remove(s.stageId) stageIdToTasksFailed.remove(s.stageId) @@ -149,6 +151,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList Option(taskEnd.taskMetrics).foreach { taskMetrics => taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } + y.bytesSpilled += taskMetrics.bytesSpilled } } case _ => {} @@ -184,6 +187,10 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageIdToShuffleWrite(sid) += shuffleWrite totalShuffleWrite += shuffleWrite + stageIdToBytesSpilled.getOrElseUpdate(sid, 0L) + val bytesSpilled = metrics.map(m => m.bytesSpilled).getOrElse(0L) + stageIdToBytesSpilled(sid) += bytesSpilled + val taskList = stageIdToTaskInfos.getOrElse( sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList -= ((taskEnd.taskInfo, None, None)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 8dcfeacb60..bb104318b0 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -56,6 +56,8 @@ private[spark] class StagePage(parent: JobProgressUI) { val hasShuffleRead = shuffleReadBytes > 0 val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L) val hasShuffleWrite = shuffleWriteBytes > 0 + val bytesSpilled = listener.stageIdToBytesSpilled.getOrElse(stageId, 0L) + val hasBytesSpilled = bytesSpilled > 0 var activeTime = 0L listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now)) @@ -81,6 +83,12 @@ private[spark] class StagePage(parent: JobProgressUI) { {Utils.bytesToString(shuffleWriteBytes)} </li> } + {if (hasBytesSpilled) + <li> + <strong>Bytes spilled: </strong> + {Utils.bytesToString(bytesSpilled)} + </li> + } </ul> </div> @@ -89,9 +97,10 @@ private[spark] class StagePage(parent: JobProgressUI) { Seq("Duration", "GC Time", "Result Ser Time") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ + {if (hasBytesSpilled) Seq("Bytes Spilled") else Nil} ++ Seq("Errors") - val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks) + val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined)) @@ -153,13 +162,20 @@ private[spark] class StagePage(parent: JobProgressUI) { } val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) + val bytesSpilledSizes = validTasks.map { + case(info, metrics, exception) => + metrics.get.bytesSpilled.toDouble + } + val bytesSpilledQuantiles = "Bytes Spilled" +: getQuantileCols(bytesSpilledSizes) + val listings: Seq[Seq[String]] = Seq( serializationQuantiles, serviceQuantiles, gettingResultQuantiles, schedulerDelayQuantiles, if (hasShuffleRead) shuffleReadQuantiles else Nil, - if (hasShuffleWrite) shuffleWriteQuantiles else Nil) + if (hasShuffleWrite) shuffleWriteQuantiles else Nil, + if (hasBytesSpilled) bytesSpilledQuantiles else Nil) val quantileHeaders = Seq("Metric", "Min", "25th percentile", "Median", "75th percentile", "Max") @@ -178,8 +194,7 @@ private[spark] class StagePage(parent: JobProgressUI) { } } - - def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean) + def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean) (taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = { def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] = trace.map(e => <span style="display:block;">{e.toString}</span>) @@ -205,6 +220,10 @@ private[spark] class StagePage(parent: JobProgressUI) { val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms => if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("") + val maybeBytesSpilled = metrics.map{m => m.bytesSpilled} + val bytesSpilledSortable = maybeBytesSpilled.map(_.toString).getOrElse("") + val bytesSpilledReadable = maybeBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("") + <tr> <td>{info.index}</td> <td>{info.taskId}</td> @@ -234,6 +253,11 @@ private[spark] class StagePage(parent: JobProgressUI) { {shuffleWriteReadable} </td> }} + {if (bytesSpilled) { + <td sorttable_customkey={bytesSpilledSortable}> + {bytesSpilledReadable} + </td> + }} <td>{exception.map(e => <span> {e.className} ({e.description})<br/> diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index e3bcd895aa..0100083682 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -77,7 +77,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( } // Number of pairs in the in-memory map - private var numPairsInMemory = 0 + private var numPairsInMemory = 0L // Number of in-memory pairs inserted before tracking the map's shuffle memory usage private val trackMemoryThreshold = 1000 @@ -85,6 +85,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( // How many times we have spilled so far private var spillCount = 0 + // Number of bytes spilled in total + private var bytesSpilled = 0L + private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false) private val comparator = new KCComparator[K, C] @@ -161,6 +164,14 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( shuffleMemoryMap(Thread.currentThread().getId) = 0 } numPairsInMemory = 0 + bytesSpilled += mapSize + } + + /** + * Register the total number of bytes spilled by this task + */ + def registerBytesSpilled(taskId: Long) { + SparkEnv.get.bytesSpilledMap(taskId) = bytesSpilled } /** |