aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2014-06-29 22:01:42 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2014-06-29 22:01:42 -0700
commit7b71a0e09622e09285a9884ebb67b5fb1c5caa53 (patch)
tree8a6480848bd9d3ed6cbf241d2d07df775eb4dd66
parentcdf613fc52d7057555a2dbf2241ce90bacbabb4a (diff)
downloadspark-7b71a0e09622e09285a9884ebb67b5fb1c5caa53.tar.gz
spark-7b71a0e09622e09285a9884ebb67b5fb1c5caa53.tar.bz2
spark-7b71a0e09622e09285a9884ebb67b5fb1c5caa53.zip
[SPARK-1683] Track task read metrics.
This commit adds a new metric in TaskMetrics to record the input data size and displays this information in the UI. An earlier version of this commit also added the read time, which can be useful for diagnosing straggler problems, but unfortunately that change introduced a significant performance regression for jobs that don't do much computation. In order to track read time, we'll need to do sampling. The screenshots below show the UI with the new "Input" field, which I added to the stage summary page, the executor summary page, and the per-stage page. ![image](https://cloud.githubusercontent.com/assets/1108612/3167930/2627f92a-eb77-11e3-861c-98ea5bb7a1a2.png) ![image](https://cloud.githubusercontent.com/assets/1108612/3167936/475a889c-eb77-11e3-9706-f11c48751f17.png) ![image](https://cloud.githubusercontent.com/assets/1108612/3167948/80ebcf12-eb77-11e3-87ed-349fce6a770c.png) Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #962 from kayousterhout/read_metrics and squashes the following commits: f13b67d [Kay Ousterhout] Correctly format input bytes on executor page 8b70cde [Kay Ousterhout] Added comment about potential inaccuracy of bytesRead d1016e8 [Kay Ousterhout] Udated SparkListenerSuite test 8461492 [Kay Ousterhout] Miniscule style fix ae04d99 [Kay Ousterhout] Remove input metrics for parallel collections 719f19d [Kay Ousterhout] Style fixes bb6ec62 [Kay Ousterhout] Small fixes 869ac7b [Kay Ousterhout] Updated Json tests 44a0301 [Kay Ousterhout] Fixed accidentally added line 4bd0568 [Kay Ousterhout] Added input source, renamed Hdfs to Hadoop. f27e535 [Kay Ousterhout] Updates based on review comments and to fix rebase bf41029 [Kay Ousterhout] Updated Json tests to pass 0fc33e0 [Kay Ousterhout] Added explicit backward compatibility test 4e52925 [Kay Ousterhout] Added Json output and associated tests. 365400b [Kay Ousterhout] [SPARK-1683] Track task read metrics.
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala63
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala84
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala102
20 files changed, 349 insertions, 86 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 3f667a4a0f..8f867686a0 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark
import scala.collection.mutable.{ArrayBuffer, HashSet}
+import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
@@ -41,9 +42,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
- case Some(values) =>
+ case Some(blockResult) =>
// Partition is already materialized, so just return its values
- new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
+ context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
+ new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
case None =>
// Acquire a lock for loading this partition
@@ -110,7 +112,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
loading.add(id)
}
- values.map(_.asInstanceOf[Iterator[T]])
+ values.map(_.data.asInstanceOf[Iterator[T]])
}
}
}
@@ -132,7 +134,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
* exceptions that can be avoided. */
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
blockManager.get(key) match {
- case Some(v) => v.asInstanceOf[Iterator[T]]
+ case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 350fd74173..ac73288442 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -67,6 +67,12 @@ class TaskMetrics extends Serializable {
var diskBytesSpilled: Long = _
/**
+ * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
+ * are stored here.
+ */
+ var inputMetrics: Option[InputMetrics] = None
+
+ /**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
@@ -87,6 +93,29 @@ private[spark] object TaskMetrics {
def empty: TaskMetrics = new TaskMetrics
}
+/**
+ * :: DeveloperApi ::
+ * Method by which input data was read. Network means that the data was read over the network
+ * from a remote block manager (which may have stored the data on-disk or in-memory).
+ */
+@DeveloperApi
+object DataReadMethod extends Enumeration with Serializable {
+ type DataReadMethod = Value
+ val Memory, Disk, Hadoop, Network = Value
+}
+
+/**
+ * :: DeveloperApi ::
+ * Metrics about reading input data.
+ */
+@DeveloperApi
+case class InputMetrics(readMethod: DataReadMethod.Value) {
+ /**
+ * Total bytes read.
+ */
+ var bytesRead: Long = 0L
+}
+
/**
* :: DeveloperApi ::
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index c64da8804d..2673ec2250 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -46,7 +46,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
val blockManager = SparkEnv.get.blockManager
val blockId = split.asInstanceOf[BlockRDDPartition].blockId
blockManager.get(blockId) match {
- case Some(block) => block.asInstanceOf[Iterator[T]]
+ case Some(block) => block.data.asInstanceOf[Iterator[T]]
case None =>
throw new Exception("Could not compute split, block " + blockId + " not found")
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2aa111d600..98dcbf4e2d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -38,6 +38,7 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.util.NextIterator
/**
@@ -196,6 +197,20 @@ class HadoopRDD[K, V](
context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
+
+ // Set the task input metrics.
+ val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ try {
+ /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
+ * always at record boundaries, so tasks may need to read into other splits to complete
+ * a record. */
+ inputMetrics.bytesRead = split.inputSplit.value.getLength()
+ } catch {
+ case e: java.io.IOException =>
+ logWarning("Unable to get input size to set InputMetrics for task", e)
+ }
+ context.taskMetrics.inputMetrics = Some(inputMetrics)
+
override def getNext() = {
try {
finished = !reader.next(key, value)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index ac1ccc06f2..f2b3a64bf1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -31,6 +31,7 @@ import org.apache.spark.Logging
import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
private[spark] class NewHadoopPartition(
rddId: Int,
@@ -112,6 +113,18 @@ class NewHadoopRDD[K, V](
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+ val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ try {
+ /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
+ * always at record boundaries, so tasks may need to read into other splits to complete
+ * a record. */
+ inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength()
+ } catch {
+ case e: Exception =>
+ logWarning("Unable to get input split size in order to set task input bytes", e)
+ }
+ context.taskMetrics.inputMetrics = Some(inputMetrics)
+
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => close())
var havePair = false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index a1e21cad48..47dd112f68 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -26,7 +26,9 @@ import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
/**
* :: DeveloperApi ::
@@ -160,7 +162,13 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
" EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname
val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
- val readMetrics = taskMetrics.shuffleReadMetrics match {
+ val inputMetrics = taskMetrics.inputMetrics match {
+ case Some(metrics) =>
+ " READ_METHOD=" + metrics.readMethod.toString +
+ " INPUT_BYTES=" + metrics.bytesRead
+ case None => ""
+ }
+ val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
case Some(metrics) =>
" SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
@@ -174,7 +182,8 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
case None => ""
}
- stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics)
+ stageLogInfo(stageId, status + info + executorRunTime + inputMetrics + shuffleReadMetrics +
+ writeMetrics)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d2f7baf928..0db0a5bc73 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,6 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
import sun.nio.ch.DirectBuffer
import org.apache.spark._
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
@@ -39,6 +40,15 @@ private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValu
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
+/* Class for returning a fetched block and associated metrics. */
+private[spark] class BlockResult(
+ val data: Iterator[Any],
+ readMethod: DataReadMethod.Value,
+ bytes: Long) {
+ val inputMetrics = new InputMetrics(readMethod)
+ inputMetrics.bytesRead = bytes
+}
+
private[spark] class BlockManager(
executorId: String,
actorSystem: ActorSystem,
@@ -334,9 +344,9 @@ private[spark] class BlockManager(
/**
* Get block from local block manager.
*/
- def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
+ def getLocal(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting local block $blockId")
- doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
+ doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
}
/**
@@ -355,11 +365,11 @@ private[spark] class BlockManager(
blockId, s"Block $blockId not found on disk, though it should be")
}
} else {
- doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+ doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
}
- private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
+ private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
@@ -386,14 +396,14 @@ private[spark] class BlockManager(
// Look for the block in memory
if (level.useMemory) {
logDebug(s"Getting block $blockId from memory")
- val result = if (asValues) {
- memoryStore.getValues(blockId)
+ val result = if (asBlockResult) {
+ memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
memoryStore.getBytes(blockId)
}
result match {
case Some(values) =>
- return Some(values)
+ return result
case None =>
logDebug(s"Block $blockId not found in memory")
}
@@ -405,10 +415,11 @@ private[spark] class BlockManager(
if (tachyonStore.contains(blockId)) {
tachyonStore.getBytes(blockId) match {
case Some(bytes) =>
- if (!asValues) {
+ if (!asBlockResult) {
return Some(bytes)
} else {
- return Some(dataDeserialize(blockId, bytes))
+ return Some(new BlockResult(
+ dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
}
case None =>
logDebug(s"Block $blockId not found in tachyon")
@@ -429,14 +440,15 @@ private[spark] class BlockManager(
if (!level.useMemory) {
// If the block shouldn't be stored in memory, we can just return it
- if (asValues) {
- return Some(dataDeserialize(blockId, bytes))
+ if (asBlockResult) {
+ return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
+ info.size))
} else {
return Some(bytes)
}
} else {
// Otherwise, we also have to store something in the memory store
- if (!level.deserialized || !asValues) {
+ if (!level.deserialized || !asBlockResult) {
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
@@ -445,7 +457,7 @@ private[spark] class BlockManager(
memoryStore.putBytes(blockId, copyForMemory, level)
bytes.rewind()
}
- if (!asValues) {
+ if (!asBlockResult) {
return Some(bytes)
} else {
val values = dataDeserialize(blockId, bytes)
@@ -457,12 +469,12 @@ private[spark] class BlockManager(
memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
match {
case Left(values2) =>
- return Some(values2)
+ return Some(new BlockResult(values2, DataReadMethod.Disk, info.size))
case _ =>
- throw new SparkException("Memory store did not return an iterator")
+ throw new SparkException("Memory store did not return back an iterator")
}
} else {
- return Some(values)
+ return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
}
}
}
@@ -477,9 +489,9 @@ private[spark] class BlockManager(
/**
* Get block from remote block managers.
*/
- def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
+ def getRemote(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting remote block $blockId")
- doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
+ doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
}
/**
@@ -487,10 +499,10 @@ private[spark] class BlockManager(
*/
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
logDebug(s"Getting remote block $blockId as bytes")
- doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+ doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
- private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
+ private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
val locations = Random.shuffle(master.getLocations(blockId))
for (loc <- locations) {
@@ -498,8 +510,11 @@ private[spark] class BlockManager(
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
if (data != null) {
- if (asValues) {
- return Some(dataDeserialize(blockId, data))
+ if (asBlockResult) {
+ return Some(new BlockResult(
+ dataDeserialize(blockId, data),
+ DataReadMethod.Network,
+ data.limit()))
} else {
return Some(data)
}
@@ -513,7 +528,7 @@ private[spark] class BlockManager(
/**
* Get a block from the block manager (either local or remote).
*/
- def get(blockId: BlockId): Option[Iterator[Any]] = {
+ def get(blockId: BlockId): Option[BlockResult] = {
val local = getLocal(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
@@ -792,7 +807,7 @@ private[spark] class BlockManager(
* Read a block consisting of a single object.
*/
def getSingle(blockId: BlockId): Option[Any] = {
- get(blockId).map(_.next())
+ get(blockId).map(_.data.next())
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index a107c5182b..328be158db 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -78,7 +78,7 @@ private[spark] object ThreadingTest {
val startTime = System.currentTimeMillis()
manager.get(blockId) match {
case Some(retrievedBlock) =>
- assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
+ assert(retrievedBlock.data.toList.asInstanceOf[List[Int]] == block.toList,
"Block " + blockId + " did not match")
println("Got block " + blockId + " in " +
(System.currentTimeMillis - startTime) + " ms")
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 6cfc46c7e7..9625337ae2 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -72,6 +72,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
"Complete Tasks",
"Total Tasks",
"Task Time",
+ "Input Bytes",
"Shuffle Read",
"Shuffle Write")
@@ -97,6 +98,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
<td>{values("Complete Tasks")}</td>
<td>{values("Total Tasks")}</td>
<td sorttable_customkey={values("Task Time")}>{Utils.msDurationToString(values("Task Time").toLong)}</td>
+ <td sorttable_customkey={values("Input Bytes")}>{Utils.bytesToString(values("Input Bytes").toLong)}</td>
<td sorttable_customkey={values("Shuffle Read")}>{Utils.bytesToString(values("Shuffle Read").toLong)}</td>
<td sorttable_customkey={values("Shuffle Write")} >{Utils.bytesToString(values("Shuffle Write").toLong)}</td>
</tr>
@@ -119,6 +121,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+ val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
@@ -136,6 +139,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
completedTasks,
totalTasks,
totalDuration,
+ totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
maxMem
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 91d37b835b..58eeb86bf9 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -46,6 +46,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener)
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
+ val executorToInputBytes = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
@@ -72,6 +73,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener)
// Update shuffle read/write
val metrics = taskEnd.taskMetrics
if (metrics != null) {
+ metrics.inputMetrics.foreach { inputMetrics =>
+ executorToInputBytes(eid) =
+ executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
+ }
metrics.shuffleReadMetrics.foreach { shuffleRead =>
executorToShuffleRead(eid) =
executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 2aaf6329b7..c4a8996c0b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -28,6 +28,7 @@ class ExecutorSummary {
var taskTime : Long = 0
var failedTasks : Int = 0
var succeededTasks : Int = 0
+ var inputBytes: Long = 0
var shuffleRead : Long = 0
var shuffleWrite : Long = 0
var memoryBytesSpilled : Long = 0
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index add0e9878a..2a34a9af92 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -43,6 +43,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
<th>Total Tasks</th>
<th>Failed Tasks</th>
<th>Succeeded Tasks</th>
+ <th>Input Bytes</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
<th>Shuffle Spill (Memory)</th>
@@ -75,6 +76,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
+ <td sorttable_customekey={v.inputBytes.toString}>{Utils.bytesToString(v.inputBytes)}</td>
<td sorttable_customekey={v.shuffleRead.toString}>{Utils.bytesToString(v.shuffleRead)}</td>
<td sorttable_customekey={v.shuffleWrite.toString}>{Utils.bytesToString(v.shuffleWrite)}</td>
<td sorttable_customekey={v.memoryBytesSpilled.toString} >{Utils.bytesToString(v.memoryBytesSpilled)}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 381a5443df..2286a7f952 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -46,13 +46,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()
- // Total metrics reflect metrics only for completed tasks
- var totalTime = 0L
- var totalShuffleRead = 0L
- var totalShuffleWrite = 0L
-
// TODO: Should probably consolidate all following into a single hash map.
val stageIdToTime = HashMap[Int, Long]()
+ val stageIdToInputBytes = HashMap[Int, Long]()
val stageIdToShuffleRead = HashMap[Int, Long]()
val stageIdToShuffleWrite = HashMap[Int, Long]()
val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
@@ -93,6 +89,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s =>
stageIdToTime.remove(s.stageId)
+ stageIdToInputBytes.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
stageIdToShuffleWrite.remove(s.stageId)
stageIdToMemoryBytesSpilled.remove(s.stageId)
@@ -171,6 +168,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val metrics = taskEnd.taskMetrics
if (metrics != null) {
+ metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
y.memoryBytesSpilled += metrics.memoryBytesSpilled
@@ -200,18 +198,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
stageIdToTime.getOrElseUpdate(sid, 0L)
val time = metrics.map(_.executorRunTime).getOrElse(0L)
stageIdToTime(sid) += time
- totalTime += time
+
+ stageIdToInputBytes.getOrElseUpdate(sid, 0L)
+ val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)
+ stageIdToInputBytes(sid) += inputBytes
stageIdToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)
stageIdToShuffleRead(sid) += shuffleRead
- totalShuffleRead += shuffleRead
stageIdToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite =
metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)
stageIdToShuffleWrite(sid) += shuffleWrite
- totalShuffleWrite += shuffleWrite
stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8e3d5d1cd4..afb8ed754f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -48,6 +48,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
val numCompleted = tasks.count(_.taskInfo.finished)
+ val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L)
+ val hasInput = inputBytes > 0
val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L)
val hasShuffleRead = shuffleReadBytes > 0
val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
@@ -69,6 +71,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<strong>Total task time across all tasks: </strong>
{UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
+ {if (hasInput)
+ <li>
+ <strong>Input: </strong>
+ {Utils.bytesToString(inputBytes)}
+ </li>
+ }
{if (hasShuffleRead)
<li>
<strong>Shuffle read: </strong>
@@ -98,13 +106,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
Seq(
"Index", "ID", "Attempt", "Status", "Locality Level", "Executor",
"Launch Time", "Duration", "GC Time") ++
+ {if (hasInput) Seq("Input") else Nil} ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
{if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
Seq("Errors")
val taskTable = UIUtils.listingTable(
- taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
+ taskHeaders, taskRow(hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined)
@@ -159,6 +168,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
def getQuantileCols(data: Seq[Double]) =
Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
+ val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
+ }
+ val inputQuantiles = "Input" +: getQuantileCols(inputSizes)
+
val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
@@ -186,6 +200,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
serviceQuantiles,
gettingResultQuantiles,
schedulerDelayQuantiles,
+ if (hasInput) inputQuantiles else Nil,
if (hasShuffleRead) shuffleReadQuantiles else Nil,
if (hasShuffleWrite) shuffleWriteQuantiles else Nil,
if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil,
@@ -209,8 +224,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
}
}
- def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean)
- (taskData: TaskUIData): Seq[Node] = {
+ def taskRow(
+ hasInput: Boolean,
+ hasShuffleRead: Boolean,
+ hasShuffleWrite: Boolean,
+ hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = {
taskData match { case TaskUIData(info, metrics, errorMessage) =>
val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
else metrics.map(_.executorRunTime).getOrElse(1L)
@@ -219,6 +237,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
+ val maybeInput = metrics.flatMap(_.inputMetrics)
+ val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("")
+ val inputReadable = maybeInput
+ .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
+ .getOrElse("")
+
val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead)
val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("")
@@ -265,12 +289,17 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
{if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
</td>
-->
- {if (shuffleRead) {
+ {if (hasInput) {
+ <td sorttable_customkey={inputSortable}>
+ {inputReadable}
+ </td>
+ }}
+ {if (hasShuffleRead) {
<td sorttable_customkey={shuffleReadSortable}>
{shuffleReadReadable}
</td>
}}
- {if (shuffleWrite) {
+ {if (hasShuffleWrite) {
<td sorttable_customkey={writeTimeSortable}>
{writeTimeReadable}
</td>
@@ -278,7 +307,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
{shuffleWriteReadable}
</td>
}}
- {if (bytesSpilled) {
+ {if (hasBytesSpilled) {
<td sorttable_customkey={memoryBytesSpilledSortable}>
{memoryBytesSpilledReadable}
</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index b2b6cc6a6e..a9ac6d5bee 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -43,6 +43,7 @@ private[ui] class StageTableBase(
<th>Submitted</th>
<th>Duration</th>
<th>Tasks: Succeeded/Total</th>
+ <th>Input</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
}
@@ -123,6 +124,11 @@ private[ui] class StageTableBase(
case _ => ""
}
val totalTasks = s.numTasks
+ val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L)
+ val inputRead = inputSortable match {
+ case 0 => ""
+ case b => Utils.bytesToString(b)
+ }
val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L)
val shuffleRead = shuffleReadSortable match {
case 0 => ""
@@ -150,6 +156,7 @@ private[ui] class StageTableBase(
<td class="progress-cell">
{makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
</td>
+ <td sorttable_customekey={inputSortable.toString}>{inputRead}</td>
<td sorttable_customekey={shuffleReadSortable.toString}>{shuffleRead}</td>
<td sorttable_customekey={shuffleWriteSortable.toString}>{shuffleWrite}</td>
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 6245b4b802..26c9c9d603 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -26,7 +26,8 @@ import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
-import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics,
+ ShuffleWriteMetrics, TaskMetrics}
import org.apache.spark.scheduler._
import org.apache.spark.storage._
import org.apache.spark._
@@ -213,6 +214,8 @@ private[spark] object JsonProtocol {
taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing)
val shuffleWriteMetrics =
taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
+ val inputMetrics =
+ taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing)
val updatedBlocks =
taskMetrics.updatedBlocks.map { blocks =>
JArray(blocks.toList.map { case (id, status) =>
@@ -230,6 +233,7 @@ private[spark] object JsonProtocol {
("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~
("Shuffle Read Metrics" -> shuffleReadMetrics) ~
("Shuffle Write Metrics" -> shuffleWriteMetrics) ~
+ ("Input Metrics" -> inputMetrics) ~
("Updated Blocks" -> updatedBlocks)
}
@@ -247,6 +251,11 @@ private[spark] object JsonProtocol {
("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime)
}
+ def inputMetricsToJson(inputMetrics: InputMetrics): JValue = {
+ ("Data Read Method" -> inputMetrics.readMethod.toString) ~
+ ("Bytes Read" -> inputMetrics.bytesRead)
+ }
+
def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
val reason = Utils.getFormattedClassName(taskEndReason)
val json = taskEndReason match {
@@ -528,6 +537,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
+ metrics.inputMetrics =
+ Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
metrics.updatedBlocks =
Utils.jsonOption(json \ "Updated Blocks").map { value =>
value.extract[List[JValue]].map { block =>
@@ -557,6 +568,13 @@ private[spark] object JsonProtocol {
metrics
}
+ def inputMetricsFromJson(json: JValue): InputMetrics = {
+ val metrics = new InputMetrics(
+ DataReadMethod.withName((json \ "Data Read Method").extract[String]))
+ metrics.bytesRead = (json \ "Bytes Read").extract[Long]
+ metrics
+ }
+
def taskEndReasonFromJson(json: JValue): TaskEndReason = {
val success = Utils.getFormattedClassName(Success)
val resubmitted = Utils.getFormattedClassName(Resubmitted)
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 4f178db40f..7f5d0b061e 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.mock.EasyMockSugar
+import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
@@ -66,7 +67,8 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
test("get cached rdd") {
expecting {
- blockManager.get(RDDBlockId(0, 0)).andReturn(Some(ArrayBuffer(5, 6, 7).iterator))
+ val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12)
+ blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result))
}
whenExecuting(blockManager) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 6df0a08096..71f48e295e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -251,6 +251,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0l)
if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) {
+ taskMetrics.inputMetrics should not be ('defined)
taskMetrics.shuffleWriteMetrics should be ('defined)
taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index d7dbe5164b..23cb6905bf 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,11 +31,13 @@ import org.scalatest.concurrent.Timeouts._
import org.scalatest.Matchers
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.executor.DataReadMethod
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
+import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.language.postfixOps
@@ -415,6 +417,39 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
}
+ test("correct BlockResult returned from get() calls") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr,
+ mapOutputTracker)
+ val list1 = List(new Array[Byte](200), new Array[Byte](200))
+ val list1ForSizeEstimate = new ArrayBuffer[Any]
+ list1ForSizeEstimate ++= list1.iterator
+ val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate)
+ val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150))
+ val list2ForSizeEstimate = new ArrayBuffer[Any]
+ list2ForSizeEstimate ++= list2.iterator
+ val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ val list1Get = store.get("list1")
+ assert(list1Get.isDefined, "list1 expected to be in store")
+ assert(list1Get.get.data.size === 2)
+ assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate)
+ assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory)
+ val list2MemoryGet = store.get("list2memory")
+ assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
+ assert(list2MemoryGet.get.data.size === 3)
+ assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate)
+ assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory)
+ val list2DiskGet = store.get("list2disk")
+ assert(list2DiskGet.isDefined, "list2memory expected to be in store")
+ assert(list2DiskGet.get.data.size === 3)
+ System.out.println(list2DiskGet)
+ // We don't know the exact size of the data on disk, but it should certainly be > 0.
+ assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
+ assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
+ }
+
test("in-memory LRU storage") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
securityMgr, mapOutputTracker)
@@ -630,18 +665,18 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list2").get.size == 2)
+ assert(store.get("list2").get.data.size === 2)
assert(store.get("list3").isDefined, "list3 was not in store")
- assert(store.get("list3").get.size == 2)
+ assert(store.get("list3").get.data.size === 2)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list2").get.size == 2)
+ assert(store.get("list2").get.data.size === 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list1").isDefined, "list1 was not in store")
- assert(store.get("list1").get.size == 2)
+ assert(store.get("list1").get.data.size === 2)
assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list2").get.size == 2)
+ assert(store.get("list2").get.data.size === 2)
assert(store.get("list3") === None, "list1 was in store")
}
@@ -656,28 +691,31 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ val listForSizeEstimate = new ArrayBuffer[Any]
+ listForSizeEstimate ++= list1.iterator
+ val listSize = SizeEstimator.estimate(listForSizeEstimate)
// At this point LRU should not kick in because list3 is only on disk
- assert(store.get("list1").isDefined, "list2 was not in store")
- assert(store.get("list1").get.size === 2)
- assert(store.get("list2").isDefined, "list3 was not in store")
- assert(store.get("list2").get.size === 2)
- assert(store.get("list3").isDefined, "list1 was not in store")
- assert(store.get("list3").get.size === 2)
- assert(store.get("list1").isDefined, "list2 was not in store")
- assert(store.get("list1").get.size === 2)
- assert(store.get("list2").isDefined, "list3 was not in store")
- assert(store.get("list2").get.size === 2)
- assert(store.get("list3").isDefined, "list1 was not in store")
- assert(store.get("list3").get.size === 2)
+ assert(store.get("list1").isDefined, "list1 was not in store")
+ assert(store.get("list1").get.data.size === 2)
+ assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.get("list2").get.data.size === 2)
+ assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.get("list3").get.data.size === 2)
+ assert(store.get("list1").isDefined, "list1 was not in store")
+ assert(store.get("list1").get.data.size === 2)
+ assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.get("list2").get.data.size === 2)
+ assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.get("list3").get.data.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
assert(store.get("list1") === None, "list1 was in store")
- assert(store.get("list2").isDefined, "list3 was not in store")
- assert(store.get("list2").get.size === 2)
- assert(store.get("list3").isDefined, "list1 was not in store")
- assert(store.get("list3").get.size === 2)
+ assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.get("list2").get.data.size === 2)
+ assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.get("list3").get.data.size === 2)
assert(store.get("list4").isDefined, "list4 was not in store")
- assert(store.get("list4").get.size === 2)
+ assert(store.get("list4").get.data.size === 2)
}
test("negative byte values in ByteBufferInputStream") {
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6c49870455..316e14100e 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -39,7 +39,11 @@ class JsonProtocolSuite extends FunSuite {
val taskGettingResult =
SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
- makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
+ makeTaskInfo(123L, 234, 67, 345L, false),
+ makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false))
+ val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
+ makeTaskInfo(123L, 234, 67, 345L, false),
+ makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true))
val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
@@ -61,6 +65,7 @@ class JsonProtocolSuite extends FunSuite {
testEvent(taskStart, taskStartJsonString)
testEvent(taskGettingResult, taskGettingResultJsonString)
testEvent(taskEnd, taskEndJsonString)
+ testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString)
testEvent(jobStart, jobStartJsonString)
testEvent(jobEnd, jobEndJsonString)
testEvent(environmentUpdate, environmentUpdateJsonString)
@@ -75,7 +80,7 @@ class JsonProtocolSuite extends FunSuite {
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
- testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8))
+ testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
// StorageLevel
@@ -118,7 +123,7 @@ class JsonProtocolSuite extends FunSuite {
testBlockId(StreamBlockId(1, 2L))
}
- test("Backward compatibility") {
+ test("StageInfo.details backward compatibility") {
// StageInfo.details was added after 1.0.0.
val info = makeStageInfo(1, 2, 3, 4L, 5L)
assert(info.details.nonEmpty)
@@ -129,6 +134,16 @@ class JsonProtocolSuite extends FunSuite {
assert("" === newInfo.details)
}
+ test("InputMetrics backward compatibility") {
+ // InputMetrics were added after 1.0.1.
+ val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true)
+ assert(metrics.inputMetrics.nonEmpty)
+ val newJson = JsonProtocol.taskMetricsToJson(metrics)
+ val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" }
+ val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+ assert(newMetrics.inputMetrics.isEmpty)
+ }
+
/** -------------------------- *
| Helper test running methods |
@@ -294,6 +309,8 @@ class JsonProtocolSuite extends FunSuite {
metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals)
assertOptionEquals(
metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals)
+ assertOptionEquals(
+ metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals)
assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals)
}
@@ -311,6 +328,11 @@ class JsonProtocolSuite extends FunSuite {
assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime)
}
+ private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) {
+ assert(metrics1.readMethod === metrics2.readMethod)
+ assert(metrics1.bytesRead === metrics2.bytesRead)
+ }
+
private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) {
assert(bm1.executorId === bm2.executorId)
assert(bm1.host === bm2.host)
@@ -403,6 +425,10 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(w1, w2)
}
+ private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) {
+ assertEquals(i1, i2)
+ }
+
private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) {
assertEquals(t1, t2)
}
@@ -460,9 +486,19 @@ class JsonProtocolSuite extends FunSuite {
new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
}
- private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = {
+ /**
+ * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
+ * set to true) or read data from a shuffle otherwise.
+ */
+ private def makeTaskMetrics(
+ a: Long,
+ b: Long,
+ c: Long,
+ d: Long,
+ e: Int,
+ f: Int,
+ hasHadoopInput: Boolean) = {
val t = new TaskMetrics
- val sr = new ShuffleReadMetrics
val sw = new ShuffleWriteMetrics
t.hostname = "localhost"
t.executorDeserializeTime = a
@@ -471,15 +507,23 @@ class JsonProtocolSuite extends FunSuite {
t.jvmGCTime = d
t.resultSerializationTime = a + b
t.memoryBytesSpilled = a + c
- sr.shuffleFinishTime = b + c
- sr.totalBlocksFetched = e + f
- sr.remoteBytesRead = b + d
- sr.localBlocksFetched = e
- sr.fetchWaitTime = a + d
- sr.remoteBlocksFetched = f
+
+ if (hasHadoopInput) {
+ val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ inputMetrics.bytesRead = d + e + f
+ t.inputMetrics = Some(inputMetrics)
+ } else {
+ val sr = new ShuffleReadMetrics
+ sr.shuffleFinishTime = b + c
+ sr.totalBlocksFetched = e + f
+ sr.remoteBytesRead = b + d
+ sr.localBlocksFetched = e
+ sr.fetchWaitTime = a + d
+ sr.remoteBlocksFetched = f
+ t.shuffleReadMetrics = Some(sr)
+ }
sw.shuffleBytesWritten = a + b + c
sw.shuffleWriteTime = b + c + d
- t.shuffleReadMetrics = Some(sr)
t.shuffleWriteMetrics = Some(sw)
// Make at most 6 blocks
t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i =>
@@ -552,8 +596,9 @@ class JsonProtocolSuite extends FunSuite {
| },
| "Shuffle Write Metrics":{
| "Shuffle Bytes Written":1200,
- | "Shuffle Write Time":1500},
- | "Updated Blocks":[
+ | "Shuffle Write Time":1500
+ | },
+ | "Updated Blocks":[
| {"Block ID":"rdd_0_0",
| "Status":{
| "Storage Level":{
@@ -568,6 +613,35 @@ class JsonProtocolSuite extends FunSuite {
|}
""".stripMargin
+ private val taskEndWithHadoopInputJsonString =
+ """
+ |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
+ |"Task End Reason":{"Reason":"Success"},
+ |"Task Info":{
+ | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
+ | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
+ | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
+ |},
+ |"Task Metrics":{
+ | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
+ | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
+ | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
+ | "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},
+ | "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100},
+ | "Updated Blocks":[
+ | {"Block ID":"rdd_0_0",
+ | "Status":{
+ | "Storage Level":{
+ | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
+ | "Replication":2
+ | },
+ | "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+ | }
+ | }
+ | ]}
+ |}
+ """
+
private val jobStartJsonString =
"""
{"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties":