aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-01-10 21:40:55 -0800
committerAndrew Or <andrewor14@gmail.com>2014-01-10 21:40:55 -0800
commitbb8098f203e61111faddf2e1a04b03d62037e6c7 (patch)
tree9acb60ac506edea5567e2d32565cbed5e35be212
parente6447152b323a8fdf71ae3a8c1086ba6948e7512 (diff)
downloadspark-bb8098f203e61111faddf2e1a04b03d62037e6c7.tar.gz
spark-bb8098f203e61111faddf2e1a04b03d62037e6c7.tar.bz2
spark-bb8098f203e61111faddf2e1a04b03d62037e6c7.zip
Add number of bytes spilled to Web UI
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala13
11 files changed, 75 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 8b30cd4bfe..0609b7b154 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -32,9 +32,9 @@ case class Aggregator[K, V, C] (
mergeCombiners: (C, C) => C) {
private val sparkConf = SparkEnv.get.conf
- private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
+ private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", false)
- def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
+ def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext) : Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
@@ -53,11 +53,12 @@ case class Aggregator[K, V, C] (
val (k, v) = iter.next()
combiners.insert(k, v)
}
+ combiners.registerBytesSpilled(context.attemptId)
combiners.iterator
}
}
- def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
+ def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
@@ -75,6 +76,7 @@ case class Aggregator[K, V, C] (
val (k, c) = iter.next()
combiners.insert(k, c)
}
+ combiners.registerBytesSpilled(context.attemptId)
combiners.iterator
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 08b592df71..6ec075070d 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -60,6 +60,9 @@ class SparkEnv private[spark] (
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
+ // A mapping of task ID to number of bytes spilled by that task. This is mainly for book-keeping.
+ val bytesSpilledMap = mutable.HashMap[Long, Long]()
+
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
// A general, soft-reference map for metadata needed during HadoopRDD split computation
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a7b2328a02..d5dae8902b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -229,6 +229,7 @@ private[spark] class Executor(
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
+ m.bytesSpilled = env.bytesSpilledMap.get(taskId).getOrElse(0)
}
val accumUpdates = Accumulators.values
@@ -279,11 +280,12 @@ private[spark] class Executor(
//System.exit(1)
}
} finally {
- // TODO: Unregister shuffle memory only for ShuffleMapTask
+ // TODO: Unregister shuffle memory only for ResultTask
val shuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
+ env.bytesSpilledMap.remove(taskId)
runningTasks.remove(taskId)
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index bb1471d9ee..44a15549cb 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -49,6 +49,11 @@ class TaskMetrics extends Serializable {
var resultSerializationTime: Long = _
/**
+ * The number of bytes spilled to disk by this task
+ */
+ var bytesSpilled: Long = _
+
+ /**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index a73714abca..6df2b3a52d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -106,7 +106,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
- val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
+
+ val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", false)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
@@ -150,6 +151,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
map.insert(kv._1, new CoGroupValue(kv._2, depNum))
}
}
+ map.registerBytesSpilled(context.attemptId)
new InterruptibleIterator(context, map.iterator)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 1248409e35..dd25d0c6ed 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -88,20 +88,22 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
self.mapPartitionsWithContext((context, iter) => {
- new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
+ new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else if (mapSideCombine) {
- val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+ val combined = self.mapPartitionsWithContext((context, iter) => {
+ aggregator.combineValuesByKey(iter, context)
+ }, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializerClass)
partitioned.mapPartitionsWithContext((context, iter) => {
- new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter))
+ new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
values.mapPartitionsWithContext((context, iter) => {
- new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
+ new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 3c53e88380..3f9cc97252 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -24,4 +24,5 @@ private[spark] class ExecutorSummary {
var succeededTasks : Int = 0
var shuffleRead : Long = 0
var shuffleWrite : Long = 0
+ var bytesSpilled : Long = 0
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 0dd876480a..2522fba8e3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -48,6 +48,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
<th>Succeeded Tasks</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
+ <th>Bytes Spilled</th>
</thead>
<tbody>
{createExecutorTable()}
@@ -80,6 +81,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
<td>{v.succeededTasks}</td>
<td>{Utils.bytesToString(v.shuffleRead)}</td>
<td>{Utils.bytesToString(v.shuffleWrite)}</td>
+ <td>{Utils.bytesToString(v.bytesSpilled)}</td>
</tr>
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index bcd2824450..f2c8658f98 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -52,6 +52,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageIdToTime = HashMap[Int, Long]()
val stageIdToShuffleRead = HashMap[Int, Long]()
val stageIdToShuffleWrite = HashMap[Int, Long]()
+ val stageIdToBytesSpilled = HashMap[Int, Long]()
val stageIdToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
val stageIdToTasksComplete = HashMap[Int, Int]()
val stageIdToTasksFailed = HashMap[Int, Int]()
@@ -78,6 +79,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageIdToTime.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
stageIdToShuffleWrite.remove(s.stageId)
+ stageIdToBytesSpilled.remove(s.stageId)
stageIdToTasksActive.remove(s.stageId)
stageIdToTasksComplete.remove(s.stageId)
stageIdToTasksFailed.remove(s.stageId)
@@ -149,6 +151,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
Option(taskEnd.taskMetrics).foreach { taskMetrics =>
taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
+ y.bytesSpilled += taskMetrics.bytesSpilled
}
}
case _ => {}
@@ -184,6 +187,10 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageIdToShuffleWrite(sid) += shuffleWrite
totalShuffleWrite += shuffleWrite
+ stageIdToBytesSpilled.getOrElseUpdate(sid, 0L)
+ val bytesSpilled = metrics.map(m => m.bytesSpilled).getOrElse(0L)
+ stageIdToBytesSpilled(sid) += bytesSpilled
+
val taskList = stageIdToTaskInfos.getOrElse(
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8dcfeacb60..bb104318b0 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -56,6 +56,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
val hasShuffleRead = shuffleReadBytes > 0
val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
val hasShuffleWrite = shuffleWriteBytes > 0
+ val bytesSpilled = listener.stageIdToBytesSpilled.getOrElse(stageId, 0L)
+ val hasBytesSpilled = bytesSpilled > 0
var activeTime = 0L
listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
@@ -81,6 +83,12 @@ private[spark] class StagePage(parent: JobProgressUI) {
{Utils.bytesToString(shuffleWriteBytes)}
</li>
}
+ {if (hasBytesSpilled)
+ <li>
+ <strong>Bytes spilled: </strong>
+ {Utils.bytesToString(bytesSpilled)}
+ </li>
+ }
</ul>
</div>
@@ -89,9 +97,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
Seq("Duration", "GC Time", "Result Ser Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
+ {if (hasBytesSpilled) Seq("Bytes Spilled") else Nil} ++
Seq("Errors")
- val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
+ val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
@@ -153,13 +162,20 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
+ val bytesSpilledSizes = validTasks.map {
+ case(info, metrics, exception) =>
+ metrics.get.bytesSpilled.toDouble
+ }
+ val bytesSpilledQuantiles = "Bytes Spilled" +: getQuantileCols(bytesSpilledSizes)
+
val listings: Seq[Seq[String]] = Seq(
serializationQuantiles,
serviceQuantiles,
gettingResultQuantiles,
schedulerDelayQuantiles,
if (hasShuffleRead) shuffleReadQuantiles else Nil,
- if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
+ if (hasShuffleWrite) shuffleWriteQuantiles else Nil,
+ if (hasBytesSpilled) bytesSpilledQuantiles else Nil)
val quantileHeaders = Seq("Metric", "Min", "25th percentile",
"Median", "75th percentile", "Max")
@@ -178,8 +194,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
}
-
- def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean)
+ def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean)
(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => <span style="display:block;">{e.toString}</span>)
@@ -205,6 +220,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms =>
if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("")
+ val maybeBytesSpilled = metrics.map{m => m.bytesSpilled}
+ val bytesSpilledSortable = maybeBytesSpilled.map(_.toString).getOrElse("")
+ val bytesSpilledReadable = maybeBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
+
<tr>
<td>{info.index}</td>
<td>{info.taskId}</td>
@@ -234,6 +253,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
{shuffleWriteReadable}
</td>
}}
+ {if (bytesSpilled) {
+ <td sorttable_customkey={bytesSpilledSortable}>
+ {bytesSpilledReadable}
+ </td>
+ }}
<td>{exception.map(e =>
<span>
{e.className} ({e.description})<br/>
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index e3bcd895aa..0100083682 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -77,7 +77,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
}
// Number of pairs in the in-memory map
- private var numPairsInMemory = 0
+ private var numPairsInMemory = 0L
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
private val trackMemoryThreshold = 1000
@@ -85,6 +85,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
// How many times we have spilled so far
private var spillCount = 0
+ // Number of bytes spilled in total
+ private var bytesSpilled = 0L
+
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
private val comparator = new KCComparator[K, C]
@@ -161,6 +164,14 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
shuffleMemoryMap(Thread.currentThread().getId) = 0
}
numPairsInMemory = 0
+ bytesSpilled += mapSize
+ }
+
+ /**
+ * Register the total number of bytes spilled by this task
+ */
+ def registerBytesSpilled(taskId: Long) {
+ SparkEnv.get.bytesSpilledMap(taskId) = bytesSpilled
}
/**