aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2014-06-29 22:01:42 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2014-06-29 22:01:42 -0700
commit7b71a0e09622e09285a9884ebb67b5fb1c5caa53 (patch)
tree8a6480848bd9d3ed6cbf241d2d07df775eb4dd66 /core/src/main/scala
parentcdf613fc52d7057555a2dbf2241ce90bacbabb4a (diff)
downloadspark-7b71a0e09622e09285a9884ebb67b5fb1c5caa53.tar.gz
spark-7b71a0e09622e09285a9884ebb67b5fb1c5caa53.tar.bz2
spark-7b71a0e09622e09285a9884ebb67b5fb1c5caa53.zip
[SPARK-1683] Track task read metrics.
This commit adds a new metric in TaskMetrics to record the input data size and displays this information in the UI. An earlier version of this commit also added the read time, which can be useful for diagnosing straggler problems, but unfortunately that change introduced a significant performance regression for jobs that don't do much computation. In order to track read time, we'll need to do sampling. The screenshots below show the UI with the new "Input" field, which I added to the stage summary page, the executor summary page, and the per-stage page. ![image](https://cloud.githubusercontent.com/assets/1108612/3167930/2627f92a-eb77-11e3-861c-98ea5bb7a1a2.png) ![image](https://cloud.githubusercontent.com/assets/1108612/3167936/475a889c-eb77-11e3-9706-f11c48751f17.png) ![image](https://cloud.githubusercontent.com/assets/1108612/3167948/80ebcf12-eb77-11e3-87ed-349fce6a770c.png) Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #962 from kayousterhout/read_metrics and squashes the following commits: f13b67d [Kay Ousterhout] Correctly format input bytes on executor page 8b70cde [Kay Ousterhout] Added comment about potential inaccuracy of bytesRead d1016e8 [Kay Ousterhout] Udated SparkListenerSuite test 8461492 [Kay Ousterhout] Miniscule style fix ae04d99 [Kay Ousterhout] Remove input metrics for parallel collections 719f19d [Kay Ousterhout] Style fixes bb6ec62 [Kay Ousterhout] Small fixes 869ac7b [Kay Ousterhout] Updated Json tests 44a0301 [Kay Ousterhout] Fixed accidentally added line 4bd0568 [Kay Ousterhout] Added input source, renamed Hdfs to Hadoop. f27e535 [Kay Ousterhout] Updates based on review comments and to fix rebase bf41029 [Kay Ousterhout] Updated Json tests to pass 0fc33e0 [Kay Ousterhout] Added explicit backward compatibility test 4e52925 [Kay Ousterhout] Added Json output and associated tests. 365400b [Kay Ousterhout] [SPARK-1683] Track task read metrics.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala63
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala20
16 files changed, 196 insertions, 48 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 3f667a4a0f..8f867686a0 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark
import scala.collection.mutable.{ArrayBuffer, HashSet}
+import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
@@ -41,9 +42,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
- case Some(values) =>
+ case Some(blockResult) =>
// Partition is already materialized, so just return its values
- new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
+ context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
+ new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
case None =>
// Acquire a lock for loading this partition
@@ -110,7 +112,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
loading.add(id)
}
- values.map(_.asInstanceOf[Iterator[T]])
+ values.map(_.data.asInstanceOf[Iterator[T]])
}
}
}
@@ -132,7 +134,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
* exceptions that can be avoided. */
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
blockManager.get(key) match {
- case Some(v) => v.asInstanceOf[Iterator[T]]
+ case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 350fd74173..ac73288442 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -67,6 +67,12 @@ class TaskMetrics extends Serializable {
var diskBytesSpilled: Long = _
/**
+ * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
+ * are stored here.
+ */
+ var inputMetrics: Option[InputMetrics] = None
+
+ /**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
@@ -87,6 +93,29 @@ private[spark] object TaskMetrics {
def empty: TaskMetrics = new TaskMetrics
}
+/**
+ * :: DeveloperApi ::
+ * Method by which input data was read. Network means that the data was read over the network
+ * from a remote block manager (which may have stored the data on-disk or in-memory).
+ */
+@DeveloperApi
+object DataReadMethod extends Enumeration with Serializable {
+ type DataReadMethod = Value
+ val Memory, Disk, Hadoop, Network = Value
+}
+
+/**
+ * :: DeveloperApi ::
+ * Metrics about reading input data.
+ */
+@DeveloperApi
+case class InputMetrics(readMethod: DataReadMethod.Value) {
+ /**
+ * Total bytes read.
+ */
+ var bytesRead: Long = 0L
+}
+
/**
* :: DeveloperApi ::
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index c64da8804d..2673ec2250 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -46,7 +46,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
val blockManager = SparkEnv.get.blockManager
val blockId = split.asInstanceOf[BlockRDDPartition].blockId
blockManager.get(blockId) match {
- case Some(block) => block.asInstanceOf[Iterator[T]]
+ case Some(block) => block.data.asInstanceOf[Iterator[T]]
case None =>
throw new Exception("Could not compute split, block " + blockId + " not found")
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2aa111d600..98dcbf4e2d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -38,6 +38,7 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.util.NextIterator
/**
@@ -196,6 +197,20 @@ class HadoopRDD[K, V](
context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
+
+ // Set the task input metrics.
+ val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ try {
+ /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
+ * always at record boundaries, so tasks may need to read into other splits to complete
+ * a record. */
+ inputMetrics.bytesRead = split.inputSplit.value.getLength()
+ } catch {
+ case e: java.io.IOException =>
+ logWarning("Unable to get input size to set InputMetrics for task", e)
+ }
+ context.taskMetrics.inputMetrics = Some(inputMetrics)
+
override def getNext() = {
try {
finished = !reader.next(key, value)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index ac1ccc06f2..f2b3a64bf1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -31,6 +31,7 @@ import org.apache.spark.Logging
import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
private[spark] class NewHadoopPartition(
rddId: Int,
@@ -112,6 +113,18 @@ class NewHadoopRDD[K, V](
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+ val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ try {
+ /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
+ * always at record boundaries, so tasks may need to read into other splits to complete
+ * a record. */
+ inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength()
+ } catch {
+ case e: Exception =>
+ logWarning("Unable to get input split size in order to set task input bytes", e)
+ }
+ context.taskMetrics.inputMetrics = Some(inputMetrics)
+
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => close())
var havePair = false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index a1e21cad48..47dd112f68 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -26,7 +26,9 @@ import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
/**
* :: DeveloperApi ::
@@ -160,7 +162,13 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
" EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname
val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
- val readMetrics = taskMetrics.shuffleReadMetrics match {
+ val inputMetrics = taskMetrics.inputMetrics match {
+ case Some(metrics) =>
+ " READ_METHOD=" + metrics.readMethod.toString +
+ " INPUT_BYTES=" + metrics.bytesRead
+ case None => ""
+ }
+ val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
case Some(metrics) =>
" SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
@@ -174,7 +182,8 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
case None => ""
}
- stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics)
+ stageLogInfo(stageId, status + info + executorRunTime + inputMetrics + shuffleReadMetrics +
+ writeMetrics)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d2f7baf928..0db0a5bc73 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,6 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
import sun.nio.ch.DirectBuffer
import org.apache.spark._
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
@@ -39,6 +40,15 @@ private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValu
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
+/* Class for returning a fetched block and associated metrics. */
+private[spark] class BlockResult(
+ val data: Iterator[Any],
+ readMethod: DataReadMethod.Value,
+ bytes: Long) {
+ val inputMetrics = new InputMetrics(readMethod)
+ inputMetrics.bytesRead = bytes
+}
+
private[spark] class BlockManager(
executorId: String,
actorSystem: ActorSystem,
@@ -334,9 +344,9 @@ private[spark] class BlockManager(
/**
* Get block from local block manager.
*/
- def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
+ def getLocal(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting local block $blockId")
- doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
+ doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
}
/**
@@ -355,11 +365,11 @@ private[spark] class BlockManager(
blockId, s"Block $blockId not found on disk, though it should be")
}
} else {
- doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+ doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
}
- private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
+ private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
@@ -386,14 +396,14 @@ private[spark] class BlockManager(
// Look for the block in memory
if (level.useMemory) {
logDebug(s"Getting block $blockId from memory")
- val result = if (asValues) {
- memoryStore.getValues(blockId)
+ val result = if (asBlockResult) {
+ memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
memoryStore.getBytes(blockId)
}
result match {
case Some(values) =>
- return Some(values)
+ return result
case None =>
logDebug(s"Block $blockId not found in memory")
}
@@ -405,10 +415,11 @@ private[spark] class BlockManager(
if (tachyonStore.contains(blockId)) {
tachyonStore.getBytes(blockId) match {
case Some(bytes) =>
- if (!asValues) {
+ if (!asBlockResult) {
return Some(bytes)
} else {
- return Some(dataDeserialize(blockId, bytes))
+ return Some(new BlockResult(
+ dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
}
case None =>
logDebug(s"Block $blockId not found in tachyon")
@@ -429,14 +440,15 @@ private[spark] class BlockManager(
if (!level.useMemory) {
// If the block shouldn't be stored in memory, we can just return it
- if (asValues) {
- return Some(dataDeserialize(blockId, bytes))
+ if (asBlockResult) {
+ return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
+ info.size))
} else {
return Some(bytes)
}
} else {
// Otherwise, we also have to store something in the memory store
- if (!level.deserialized || !asValues) {
+ if (!level.deserialized || !asBlockResult) {
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
@@ -445,7 +457,7 @@ private[spark] class BlockManager(
memoryStore.putBytes(blockId, copyForMemory, level)
bytes.rewind()
}
- if (!asValues) {
+ if (!asBlockResult) {
return Some(bytes)
} else {
val values = dataDeserialize(blockId, bytes)
@@ -457,12 +469,12 @@ private[spark] class BlockManager(
memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
match {
case Left(values2) =>
- return Some(values2)
+ return Some(new BlockResult(values2, DataReadMethod.Disk, info.size))
case _ =>
- throw new SparkException("Memory store did not return an iterator")
+ throw new SparkException("Memory store did not return back an iterator")
}
} else {
- return Some(values)
+ return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
}
}
}
@@ -477,9 +489,9 @@ private[spark] class BlockManager(
/**
* Get block from remote block managers.
*/
- def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
+ def getRemote(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting remote block $blockId")
- doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
+ doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
}
/**
@@ -487,10 +499,10 @@ private[spark] class BlockManager(
*/
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
logDebug(s"Getting remote block $blockId as bytes")
- doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+ doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
- private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
+ private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
val locations = Random.shuffle(master.getLocations(blockId))
for (loc <- locations) {
@@ -498,8 +510,11 @@ private[spark] class BlockManager(
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
if (data != null) {
- if (asValues) {
- return Some(dataDeserialize(blockId, data))
+ if (asBlockResult) {
+ return Some(new BlockResult(
+ dataDeserialize(blockId, data),
+ DataReadMethod.Network,
+ data.limit()))
} else {
return Some(data)
}
@@ -513,7 +528,7 @@ private[spark] class BlockManager(
/**
* Get a block from the block manager (either local or remote).
*/
- def get(blockId: BlockId): Option[Iterator[Any]] = {
+ def get(blockId: BlockId): Option[BlockResult] = {
val local = getLocal(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
@@ -792,7 +807,7 @@ private[spark] class BlockManager(
* Read a block consisting of a single object.
*/
def getSingle(blockId: BlockId): Option[Any] = {
- get(blockId).map(_.next())
+ get(blockId).map(_.data.next())
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index a107c5182b..328be158db 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -78,7 +78,7 @@ private[spark] object ThreadingTest {
val startTime = System.currentTimeMillis()
manager.get(blockId) match {
case Some(retrievedBlock) =>
- assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
+ assert(retrievedBlock.data.toList.asInstanceOf[List[Int]] == block.toList,
"Block " + blockId + " did not match")
println("Got block " + blockId + " in " +
(System.currentTimeMillis - startTime) + " ms")
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 6cfc46c7e7..9625337ae2 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -72,6 +72,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
"Complete Tasks",
"Total Tasks",
"Task Time",
+ "Input Bytes",
"Shuffle Read",
"Shuffle Write")
@@ -97,6 +98,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
<td>{values("Complete Tasks")}</td>
<td>{values("Total Tasks")}</td>
<td sorttable_customkey={values("Task Time")}>{Utils.msDurationToString(values("Task Time").toLong)}</td>
+ <td sorttable_customkey={values("Input Bytes")}>{Utils.bytesToString(values("Input Bytes").toLong)}</td>
<td sorttable_customkey={values("Shuffle Read")}>{Utils.bytesToString(values("Shuffle Read").toLong)}</td>
<td sorttable_customkey={values("Shuffle Write")} >{Utils.bytesToString(values("Shuffle Write").toLong)}</td>
</tr>
@@ -119,6 +121,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+ val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
@@ -136,6 +139,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
completedTasks,
totalTasks,
totalDuration,
+ totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
maxMem
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 91d37b835b..58eeb86bf9 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -46,6 +46,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener)
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
+ val executorToInputBytes = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
@@ -72,6 +73,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener)
// Update shuffle read/write
val metrics = taskEnd.taskMetrics
if (metrics != null) {
+ metrics.inputMetrics.foreach { inputMetrics =>
+ executorToInputBytes(eid) =
+ executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
+ }
metrics.shuffleReadMetrics.foreach { shuffleRead =>
executorToShuffleRead(eid) =
executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 2aaf6329b7..c4a8996c0b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -28,6 +28,7 @@ class ExecutorSummary {
var taskTime : Long = 0
var failedTasks : Int = 0
var succeededTasks : Int = 0
+ var inputBytes: Long = 0
var shuffleRead : Long = 0
var shuffleWrite : Long = 0
var memoryBytesSpilled : Long = 0
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index add0e9878a..2a34a9af92 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -43,6 +43,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
<th>Total Tasks</th>
<th>Failed Tasks</th>
<th>Succeeded Tasks</th>
+ <th>Input Bytes</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
<th>Shuffle Spill (Memory)</th>
@@ -75,6 +76,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
+ <td sorttable_customekey={v.inputBytes.toString}>{Utils.bytesToString(v.inputBytes)}</td>
<td sorttable_customekey={v.shuffleRead.toString}>{Utils.bytesToString(v.shuffleRead)}</td>
<td sorttable_customekey={v.shuffleWrite.toString}>{Utils.bytesToString(v.shuffleWrite)}</td>
<td sorttable_customekey={v.memoryBytesSpilled.toString} >{Utils.bytesToString(v.memoryBytesSpilled)}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 381a5443df..2286a7f952 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -46,13 +46,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()
- // Total metrics reflect metrics only for completed tasks
- var totalTime = 0L
- var totalShuffleRead = 0L
- var totalShuffleWrite = 0L
-
// TODO: Should probably consolidate all following into a single hash map.
val stageIdToTime = HashMap[Int, Long]()
+ val stageIdToInputBytes = HashMap[Int, Long]()
val stageIdToShuffleRead = HashMap[Int, Long]()
val stageIdToShuffleWrite = HashMap[Int, Long]()
val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
@@ -93,6 +89,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s =>
stageIdToTime.remove(s.stageId)
+ stageIdToInputBytes.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
stageIdToShuffleWrite.remove(s.stageId)
stageIdToMemoryBytesSpilled.remove(s.stageId)
@@ -171,6 +168,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val metrics = taskEnd.taskMetrics
if (metrics != null) {
+ metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
y.memoryBytesSpilled += metrics.memoryBytesSpilled
@@ -200,18 +198,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
stageIdToTime.getOrElseUpdate(sid, 0L)
val time = metrics.map(_.executorRunTime).getOrElse(0L)
stageIdToTime(sid) += time
- totalTime += time
+
+ stageIdToInputBytes.getOrElseUpdate(sid, 0L)
+ val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)
+ stageIdToInputBytes(sid) += inputBytes
stageIdToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)
stageIdToShuffleRead(sid) += shuffleRead
- totalShuffleRead += shuffleRead
stageIdToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite =
metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)
stageIdToShuffleWrite(sid) += shuffleWrite
- totalShuffleWrite += shuffleWrite
stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8e3d5d1cd4..afb8ed754f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -48,6 +48,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
val numCompleted = tasks.count(_.taskInfo.finished)
+ val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L)
+ val hasInput = inputBytes > 0
val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L)
val hasShuffleRead = shuffleReadBytes > 0
val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
@@ -69,6 +71,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<strong>Total task time across all tasks: </strong>
{UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
+ {if (hasInput)
+ <li>
+ <strong>Input: </strong>
+ {Utils.bytesToString(inputBytes)}
+ </li>
+ }
{if (hasShuffleRead)
<li>
<strong>Shuffle read: </strong>
@@ -98,13 +106,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
Seq(
"Index", "ID", "Attempt", "Status", "Locality Level", "Executor",
"Launch Time", "Duration", "GC Time") ++
+ {if (hasInput) Seq("Input") else Nil} ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
{if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
Seq("Errors")
val taskTable = UIUtils.listingTable(
- taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
+ taskHeaders, taskRow(hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined)
@@ -159,6 +168,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
def getQuantileCols(data: Seq[Double]) =
Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
+ val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
+ }
+ val inputQuantiles = "Input" +: getQuantileCols(inputSizes)
+
val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
@@ -186,6 +200,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
serviceQuantiles,
gettingResultQuantiles,
schedulerDelayQuantiles,
+ if (hasInput) inputQuantiles else Nil,
if (hasShuffleRead) shuffleReadQuantiles else Nil,
if (hasShuffleWrite) shuffleWriteQuantiles else Nil,
if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil,
@@ -209,8 +224,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
}
}
- def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean)
- (taskData: TaskUIData): Seq[Node] = {
+ def taskRow(
+ hasInput: Boolean,
+ hasShuffleRead: Boolean,
+ hasShuffleWrite: Boolean,
+ hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = {
taskData match { case TaskUIData(info, metrics, errorMessage) =>
val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
else metrics.map(_.executorRunTime).getOrElse(1L)
@@ -219,6 +237,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
+ val maybeInput = metrics.flatMap(_.inputMetrics)
+ val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("")
+ val inputReadable = maybeInput
+ .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
+ .getOrElse("")
+
val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead)
val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("")
@@ -265,12 +289,17 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
{if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
</td>
-->
- {if (shuffleRead) {
+ {if (hasInput) {
+ <td sorttable_customkey={inputSortable}>
+ {inputReadable}
+ </td>
+ }}
+ {if (hasShuffleRead) {
<td sorttable_customkey={shuffleReadSortable}>
{shuffleReadReadable}
</td>
}}
- {if (shuffleWrite) {
+ {if (hasShuffleWrite) {
<td sorttable_customkey={writeTimeSortable}>
{writeTimeReadable}
</td>
@@ -278,7 +307,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
{shuffleWriteReadable}
</td>
}}
- {if (bytesSpilled) {
+ {if (hasBytesSpilled) {
<td sorttable_customkey={memoryBytesSpilledSortable}>
{memoryBytesSpilledReadable}
</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index b2b6cc6a6e..a9ac6d5bee 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -43,6 +43,7 @@ private[ui] class StageTableBase(
<th>Submitted</th>
<th>Duration</th>
<th>Tasks: Succeeded/Total</th>
+ <th>Input</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
}
@@ -123,6 +124,11 @@ private[ui] class StageTableBase(
case _ => ""
}
val totalTasks = s.numTasks
+ val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L)
+ val inputRead = inputSortable match {
+ case 0 => ""
+ case b => Utils.bytesToString(b)
+ }
val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L)
val shuffleRead = shuffleReadSortable match {
case 0 => ""
@@ -150,6 +156,7 @@ private[ui] class StageTableBase(
<td class="progress-cell">
{makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
</td>
+ <td sorttable_customekey={inputSortable.toString}>{inputRead}</td>
<td sorttable_customekey={shuffleReadSortable.toString}>{shuffleRead}</td>
<td sorttable_customekey={shuffleWriteSortable.toString}>{shuffleWrite}</td>
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 6245b4b802..26c9c9d603 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -26,7 +26,8 @@ import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
-import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics,
+ ShuffleWriteMetrics, TaskMetrics}
import org.apache.spark.scheduler._
import org.apache.spark.storage._
import org.apache.spark._
@@ -213,6 +214,8 @@ private[spark] object JsonProtocol {
taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing)
val shuffleWriteMetrics =
taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
+ val inputMetrics =
+ taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing)
val updatedBlocks =
taskMetrics.updatedBlocks.map { blocks =>
JArray(blocks.toList.map { case (id, status) =>
@@ -230,6 +233,7 @@ private[spark] object JsonProtocol {
("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~
("Shuffle Read Metrics" -> shuffleReadMetrics) ~
("Shuffle Write Metrics" -> shuffleWriteMetrics) ~
+ ("Input Metrics" -> inputMetrics) ~
("Updated Blocks" -> updatedBlocks)
}
@@ -247,6 +251,11 @@ private[spark] object JsonProtocol {
("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime)
}
+ def inputMetricsToJson(inputMetrics: InputMetrics): JValue = {
+ ("Data Read Method" -> inputMetrics.readMethod.toString) ~
+ ("Bytes Read" -> inputMetrics.bytesRead)
+ }
+
def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
val reason = Utils.getFormattedClassName(taskEndReason)
val json = taskEndReason match {
@@ -528,6 +537,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
+ metrics.inputMetrics =
+ Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
metrics.updatedBlocks =
Utils.jsonOption(json \ "Updated Blocks").map { value =>
value.extract[List[JValue]].map { block =>
@@ -557,6 +568,13 @@ private[spark] object JsonProtocol {
metrics
}
+ def inputMetricsFromJson(json: JValue): InputMetrics = {
+ val metrics = new InputMetrics(
+ DataReadMethod.withName((json \ "Data Read Method").extract[String]))
+ metrics.bytesRead = (json \ "Bytes Read").extract[Long]
+ metrics
+ }
+
def taskEndReasonFromJson(json: JValue): TaskEndReason = {
val success = Utils.getFormattedClassName(Success)
val resubmitted = Utils.getFormattedClassName(Resubmitted)