aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKostas Sakellis <kostas@cloudera.com>2015-02-06 14:31:20 -0800
committerPatrick Wendell <patrick@databricks.com>2015-02-06 14:31:20 -0800
commitdcd1e42d6b6ac08d2c0736bf61a15f515a1f222b (patch)
tree13b498d7cc6fff743fce1719aef50d716d7677f2 /core
parent57961567ef104efb3174e67d762c5d9d6263b800 (diff)
downloadspark-dcd1e42d6b6ac08d2c0736bf61a15f515a1f222b.tar.gz
spark-dcd1e42d6b6ac08d2c0736bf61a15f515a1f222b.tar.bz2
spark-dcd1e42d6b6ac08d2c0736bf61a15f515a1f222b.zip
[SPARK-4874] [CORE] Collect record count metrics
Collects record counts for both Input/Output and Shuffle Metrics. For the input/output metrics, it just appends the counter every time the iterators get accessed. For shuffle on the write side, we count the metrics post aggregation (after a map side combine) and on the read side we count the metrics pre aggregation. This allows both the bytes read/written metrics and the records read/written to line up. For backwards compatibility, if we deserialize an older event that doesn't have record metrics, we set the metric to -1. Author: Kostas Sakellis <kostas@cloudera.com> Closes #4067 from ksakellis/kostas-spark-4874 and squashes the following commits: bd919be [Kostas Sakellis] Changed 'Records Read' in shuffleReadMetrics json output to 'Total Records Read' dad4d57 [Kostas Sakellis] Add a comment and check to BlockObjectWriter so that it cannot be reopend. 6f236a1 [Kostas Sakellis] Renamed _recordsWritten in ShuffleWriteMetrics to be more consistent 70620a0 [Kostas Sakellis] CR Feedback 17faa3a [Kostas Sakellis] Removed AtomicLong in favour of using Long b6f9923 [Kostas Sakellis] Merge AfterNextInterceptingIterator with InterruptableIterator to save a function call 46c8186 [Kostas Sakellis] Combined Bytes and # records into one column 57551c1 [Kostas Sakellis] Conforms to SPARK-3288 6cdb44e [Kostas Sakellis] Removed the generic InterceptingIterator and repalced it with specific implementation 1aa273c [Kostas Sakellis] CR Feedback 1bb78b1 [Kostas Sakellis] [SPARK-4874] [CORE] Collect record count metrics
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala54
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ToolTips.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala148
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala186
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala55
20 files changed, 548 insertions, 146 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index a0c0372b7f..a96d754744 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -47,10 +47,15 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
- existingMetrics.addBytesRead(inputMetrics.bytesRead)
-
- new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
+ existingMetrics.incBytesRead(inputMetrics.bytesRead)
+ val iter = blockResult.data.asInstanceOf[Iterator[T]]
+ new InterruptibleIterator[T](context, iter) {
+ override def next(): T = {
+ existingMetrics.incRecordsRead(1)
+ delegate.next()
+ }
+ }
case None =>
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 97912c68c5..d05659193b 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -194,18 +194,19 @@ class TaskMetrics extends Serializable {
/**
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
- private[spark] def updateShuffleReadMetrics() = synchronized {
+ private[spark] def updateShuffleReadMetrics(): Unit = synchronized {
val merged = new ShuffleReadMetrics()
for (depMetrics <- depsShuffleReadMetrics) {
merged.incFetchWaitTime(depMetrics.fetchWaitTime)
merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
+ merged.incRecordsRead(depMetrics.recordsRead)
}
_shuffleReadMetrics = Some(merged)
}
- private[spark] def updateInputMetrics() = synchronized {
+ private[spark] def updateInputMetrics(): Unit = synchronized {
inputMetrics.foreach(_.updateBytesRead())
}
}
@@ -242,27 +243,31 @@ object DataWriteMethod extends Enumeration with Serializable {
@DeveloperApi
case class InputMetrics(readMethod: DataReadMethod.Value) {
- private val _bytesRead: AtomicLong = new AtomicLong()
+ /**
+ * This is volatile so that it is visible to the updater thread.
+ */
+ @volatile @transient var bytesReadCallback: Option[() => Long] = None
/**
* Total bytes read.
*/
- def bytesRead: Long = _bytesRead.get()
- @volatile @transient var bytesReadCallback: Option[() => Long] = None
+ private var _bytesRead: Long = _
+ def bytesRead: Long = _bytesRead
+ def incBytesRead(bytes: Long) = _bytesRead += bytes
/**
- * Adds additional bytes read for this read method.
+ * Total records read.
*/
- def addBytesRead(bytes: Long) = {
- _bytesRead.addAndGet(bytes)
- }
+ private var _recordsRead: Long = _
+ def recordsRead: Long = _recordsRead
+ def incRecordsRead(records: Long) = _recordsRead += records
/**
* Invoke the bytesReadCallback and mutate bytesRead.
*/
def updateBytesRead() {
bytesReadCallback.foreach { c =>
- _bytesRead.set(c())
+ _bytesRead = c()
}
}
@@ -287,6 +292,13 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
private var _bytesWritten: Long = _
def bytesWritten = _bytesWritten
private[spark] def setBytesWritten(value : Long) = _bytesWritten = value
+
+ /**
+ * Total records written
+ */
+ private var _recordsWritten: Long = 0L
+ def recordsWritten = _recordsWritten
+ private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value
}
/**
@@ -301,7 +313,7 @@ class ShuffleReadMetrics extends Serializable {
private var _remoteBlocksFetched: Int = _
def remoteBlocksFetched = _remoteBlocksFetched
private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
- private[spark] def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
+ private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
/**
* Number of local blocks fetched in this shuffle by this task
@@ -309,8 +321,7 @@ class ShuffleReadMetrics extends Serializable {
private var _localBlocksFetched: Int = _
def localBlocksFetched = _localBlocksFetched
private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
- private[spark] def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
-
+ private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
/**
* Time the task spent waiting for remote shuffle blocks. This only includes the time
@@ -334,6 +345,14 @@ class ShuffleReadMetrics extends Serializable {
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched
+
+ /**
+ * Total number of records read from the shuffle by this task
+ */
+ private var _recordsRead: Long = _
+ def recordsRead = _recordsRead
+ private[spark] def incRecordsRead(value: Long) = _recordsRead += value
+ private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
}
/**
@@ -358,5 +377,12 @@ class ShuffleWriteMetrics extends Serializable {
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
-
+ /**
+ * Total number of records written to the shuffle by this task
+ */
+ @volatile private var _shuffleRecordsWritten: Long = _
+ def shuffleRecordsWritten = _shuffleRecordsWritten
+ private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
+ private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
+ private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 89adddcf0a..486e86ce1b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -247,7 +247,9 @@ class HadoopRDD[K, V](
case eof: EOFException =>
finished = true
}
-
+ if (!finished) {
+ inputMetrics.incRecordsRead(1)
+ }
(key, value)
}
@@ -261,7 +263,7 @@ class HadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.addBytesRead(split.inputSplit.value.getLength)
+ inputMetrics.incBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 44b9ffd2a5..7fb94840df 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -151,7 +151,9 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
-
+ if (!finished) {
+ inputMetrics.incRecordsRead(1)
+ }
(reader.getCurrentKey, reader.getCurrentValue)
}
@@ -165,7 +167,7 @@ class NewHadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength)
+ inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 49b88a90ab..955b42c3ba 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat,
-RecordWriter => NewRecordWriter}
+ RecordWriter => NewRecordWriter}
import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
@@ -993,8 +993,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
+ var recordsWritten = 0L
try {
- var recordsWritten = 0L
while (iter.hasNext) {
val pair = iter.next()
writer.write(pair._1, pair._2)
@@ -1008,6 +1008,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
committer.commitTask(hadoopContext)
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
+ outputMetrics.setRecordsWritten(recordsWritten)
1
} : Int
@@ -1065,8 +1066,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
+ var recordsWritten = 0L
try {
- var recordsWritten = 0L
while (iter.hasNext) {
val record = iter.next()
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
@@ -1080,6 +1081,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
writer.commit()
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
+ outputMetrics.setRecordsWritten(recordsWritten)
}
self.context.runJob(self, writeToFile)
@@ -1097,9 +1099,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => Long],
outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
- if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
- && bytesWrittenCallback.isDefined) {
+ if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
+ outputMetrics.setRecordsWritten(recordsWritten)
}
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index e3e7434df4..7a2c5ae32d 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -86,6 +86,12 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
context.taskMetrics.updateShuffleReadMetrics()
})
- new InterruptibleIterator[T](context, completionIter)
+ new InterruptibleIterator[T](context, completionIter) {
+ val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
+ override def next(): T = {
+ readMetrics.incRecordsRead(1)
+ delegate.next()
+ }
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 8bc5a1cd18..86dbd89f0f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -53,7 +53,7 @@ private[spark] class BlockResult(
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
- inputMetrics.addBytesRead(bytes)
+ inputMetrics.incBytesRead(bytes)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 3198d766fc..81164178b9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -29,7 +29,8 @@ import org.apache.spark.executor.ShuffleWriteMetrics
* appending data to an existing block, and can guarantee atomicity in the case of faults
* as it allows the caller to revert partial writes.
*
- * This interface does not support concurrent writes.
+ * This interface does not support concurrent writes. Also, once the writer has
+ * been opened, it cannot be reopened again.
*/
private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
@@ -95,6 +96,7 @@ private[spark] class DiskBlockObjectWriter(
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private var initialized = false
+ private var hasBeenClosed = false
/**
* Cursors used to represent positions in the file.
@@ -115,11 +117,16 @@ private[spark] class DiskBlockObjectWriter(
private var finalPosition: Long = -1
private var reportedPosition = initialPosition
- /** Calling channel.position() to update the write metrics can be a little bit expensive, so we
- * only call it every N writes */
- private var writesSinceMetricsUpdate = 0
+ /**
+ * Keep track of number of records written and also use this to periodically
+ * output bytes written since the latter is expensive to do for each record.
+ */
+ private var numRecordsWritten = 0
override def open(): BlockObjectWriter = {
+ if (hasBeenClosed) {
+ throw new IllegalStateException("Writer already closed. Cannot be reopened.")
+ }
fos = new FileOutputStream(file, true)
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
@@ -145,6 +152,7 @@ private[spark] class DiskBlockObjectWriter(
ts = null
objOut = null
initialized = false
+ hasBeenClosed = true
}
}
@@ -168,6 +176,7 @@ private[spark] class DiskBlockObjectWriter(
override def revertPartialWritesAndClose() {
try {
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
+ writeMetrics.decShuffleRecordsWritten(numRecordsWritten)
if (initialized) {
objOut.flush()
@@ -193,12 +202,11 @@ private[spark] class DiskBlockObjectWriter(
}
objOut.writeObject(value)
+ numRecordsWritten += 1
+ writeMetrics.incShuffleRecordsWritten(1)
- if (writesSinceMetricsUpdate == 32) {
- writesSinceMetricsUpdate = 0
+ if (numRecordsWritten % 32 == 0) {
updateBytesWritten()
- } else {
- writesSinceMetricsUpdate += 1
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index 4307029d44..3a15e603b1 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -29,14 +29,15 @@ private[spark] object ToolTips {
val SHUFFLE_READ_BLOCKED_TIME =
"Time that the task spent blocked waiting for shuffle data to be read from remote machines."
- val INPUT = "Bytes read from Hadoop or from Spark storage."
+ val INPUT = "Bytes and records read from Hadoop or from Spark storage."
- val OUTPUT = "Bytes written to Hadoop."
+ val OUTPUT = "Bytes and records written to Hadoop."
- val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."
+ val SHUFFLE_WRITE =
+ "Bytes and records written to disk in order to be read by a shuffle in a future stage."
val SHUFFLE_READ =
- """Bytes read from remote executors. Typically less than shuffle write bytes
+ """Bytes and records read from remote executors. Typically less than shuffle write bytes
because this does not include shuffle data read locally."""
val GETTING_RESULT_TIME =
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index a38cb75fdd..3afd7ef07d 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -48,7 +48,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
val executorToInputBytes = HashMap[String, Long]()
+ val executorToInputRecords = HashMap[String, Long]()
val executorToOutputBytes = HashMap[String, Long]()
+ val executorToOutputRecords = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
val executorToLogUrls = HashMap[String, Map[String, String]]()
@@ -84,10 +86,14 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
metrics.inputMetrics.foreach { inputMetrics =>
executorToInputBytes(eid) =
executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
+ executorToInputRecords(eid) =
+ executorToInputRecords.getOrElse(eid, 0L) + inputMetrics.recordsRead
}
metrics.outputMetrics.foreach { outputMetrics =>
executorToOutputBytes(eid) =
executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten
+ executorToOutputRecords(eid) =
+ executorToOutputRecords.getOrElse(eid, 0L) + outputMetrics.recordsWritten
}
metrics.shuffleReadMetrics.foreach { shuffleRead =>
executorToShuffleRead(eid) =
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 9836d11a6d..1f8536d1b7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -36,6 +36,20 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
/** Special table which merges two header cells. */
private def executorTable[T](): Seq[Node] = {
+ val stageData = listener.stageIdToData.get((stageId, stageAttemptId))
+ var hasInput = false
+ var hasOutput = false
+ var hasShuffleWrite = false
+ var hasShuffleRead = false
+ var hasBytesSpilled = false
+ stageData.foreach(data => {
+ hasInput = data.hasInput
+ hasOutput = data.hasOutput
+ hasShuffleRead = data.hasShuffleRead
+ hasShuffleWrite = data.hasShuffleWrite
+ hasBytesSpilled = data.hasBytesSpilled
+ })
+
<table class={UIUtils.TABLE_CLASS_STRIPED}>
<thead>
<th>Executor ID</th>
@@ -44,12 +58,32 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
<th>Total Tasks</th>
<th>Failed Tasks</th>
<th>Succeeded Tasks</th>
- <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
- <th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th>
- <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
- <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>Shuffle Write</span></th>
- <th>Shuffle Spill (Memory)</th>
- <th>Shuffle Spill (Disk)</th>
+ {if (hasInput) {
+ <th>
+ <span data-toggle="tooltip" title={ToolTips.INPUT}>Input Size / Records</span>
+ </th>
+ }}
+ {if (hasOutput) {
+ <th>
+ <span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output Size / Records</span>
+ </th>
+ }}
+ {if (hasShuffleRead) {
+ <th>
+ <span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>
+ Shuffle Read Size / Records</span>
+ </th>
+ }}
+ {if (hasShuffleWrite) {
+ <th>
+ <span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>
+ Shuffle Write Size / Records</span>
+ </th>
+ }}
+ {if (hasBytesSpilled) {
+ <th>Shuffle Spill (Memory)</th>
+ <th>Shuffle Spill (Disk)</th>
+ }}
</thead>
<tbody>
{createExecutorTable()}
@@ -76,18 +110,34 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
- <td sorttable_customkey={v.inputBytes.toString}>
- {Utils.bytesToString(v.inputBytes)}</td>
- <td sorttable_customkey={v.outputBytes.toString}>
- {Utils.bytesToString(v.outputBytes)}</td>
- <td sorttable_customkey={v.shuffleRead.toString}>
- {Utils.bytesToString(v.shuffleRead)}</td>
- <td sorttable_customkey={v.shuffleWrite.toString}>
- {Utils.bytesToString(v.shuffleWrite)}</td>
- <td sorttable_customkey={v.memoryBytesSpilled.toString}>
- {Utils.bytesToString(v.memoryBytesSpilled)}</td>
- <td sorttable_customkey={v.diskBytesSpilled.toString}>
- {Utils.bytesToString(v.diskBytesSpilled)}</td>
+ {if (stageData.hasInput) {
+ <td sorttable_customkey={v.inputBytes.toString}>
+ {s"${Utils.bytesToString(v.inputBytes)} / ${v.inputRecords}"}
+ </td>
+ }}
+ {if (stageData.hasOutput) {
+ <td sorttable_customkey={v.outputBytes.toString}>
+ {s"${Utils.bytesToString(v.outputBytes)} / ${v.outputRecords}"}
+ </td>
+ }}
+ {if (stageData.hasShuffleRead) {
+ <td sorttable_customkey={v.shuffleRead.toString}>
+ {s"${Utils.bytesToString(v.shuffleRead)} / ${v.shuffleReadRecords}"}
+ </td>
+ }}
+ {if (stageData.hasShuffleWrite) {
+ <td sorttable_customkey={v.shuffleWrite.toString}>
+ {s"${Utils.bytesToString(v.shuffleWrite)} / ${v.shuffleWriteRecords}"}
+ </td>
+ }}
+ {if (stageData.hasBytesSpilled) {
+ <td sorttable_customkey={v.memoryBytesSpilled.toString}>
+ {Utils.bytesToString(v.memoryBytesSpilled)}
+ </td>
+ <td sorttable_customkey={v.diskBytesSpilled.toString}>
+ {Utils.bytesToString(v.diskBytesSpilled)}
+ </td>
+ }}
</tr>
}
case None =>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 4d200eeda8..f463f8d7c7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -394,24 +394,48 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData.shuffleWriteBytes += shuffleWriteDelta
execSummary.shuffleWrite += shuffleWriteDelta
+ val shuffleWriteRecordsDelta =
+ (taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L)
+ - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleRecordsWritten).getOrElse(0L))
+ stageData.shuffleWriteRecords += shuffleWriteRecordsDelta
+ execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta
+
val shuffleReadDelta =
(taskMetrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L))
stageData.shuffleReadBytes += shuffleReadDelta
execSummary.shuffleRead += shuffleReadDelta
+ val shuffleReadRecordsDelta =
+ (taskMetrics.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L)
+ - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.recordsRead).getOrElse(0L))
+ stageData.shuffleReadRecords += shuffleReadRecordsDelta
+ execSummary.shuffleReadRecords += shuffleReadRecordsDelta
+
val inputBytesDelta =
(taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L)
- oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L))
stageData.inputBytes += inputBytesDelta
execSummary.inputBytes += inputBytesDelta
+ val inputRecordsDelta =
+ (taskMetrics.inputMetrics.map(_.recordsRead).getOrElse(0L)
+ - oldMetrics.flatMap(_.inputMetrics).map(_.recordsRead).getOrElse(0L))
+ stageData.inputRecords += inputRecordsDelta
+ execSummary.inputRecords += inputRecordsDelta
+
val outputBytesDelta =
(taskMetrics.outputMetrics.map(_.bytesWritten).getOrElse(0L)
- oldMetrics.flatMap(_.outputMetrics).map(_.bytesWritten).getOrElse(0L))
stageData.outputBytes += outputBytesDelta
execSummary.outputBytes += outputBytesDelta
+ val outputRecordsDelta =
+ (taskMetrics.outputMetrics.map(_.recordsWritten).getOrElse(0L)
+ - oldMetrics.flatMap(_.outputMetrics).map(_.recordsWritten).getOrElse(0L))
+ stageData.outputRecords += outputRecordsDelta
+ execSummary.outputRecords += outputRecordsDelta
+
val diskSpillDelta =
taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
stageData.diskBytesSpilled += diskSpillDelta
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index d8be1b20b3..02a3cc3e43 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -56,11 +56,6 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val numCompleted = tasks.count(_.taskInfo.finished)
val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables
val hasAccumulators = accumulables.size > 0
- val hasInput = stageData.inputBytes > 0
- val hasOutput = stageData.outputBytes > 0
- val hasShuffleRead = stageData.shuffleReadBytes > 0
- val hasShuffleWrite = stageData.shuffleWriteBytes > 0
- val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0
val summary =
<div>
@@ -69,31 +64,33 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<strong>Total task time across all tasks: </strong>
{UIUtils.formatDuration(stageData.executorRunTime)}
</li>
- {if (hasInput) {
+ {if (stageData.hasInput) {
<li>
- <strong>Input: </strong>
- {Utils.bytesToString(stageData.inputBytes)}
+ <strong>Input Size / Records: </strong>
+ {s"${Utils.bytesToString(stageData.inputBytes)} / ${stageData.inputRecords}"}
</li>
}}
- {if (hasOutput) {
+ {if (stageData.hasOutput) {
<li>
<strong>Output: </strong>
- {Utils.bytesToString(stageData.outputBytes)}
+ {s"${Utils.bytesToString(stageData.outputBytes)} / ${stageData.outputRecords}"}
</li>
}}
- {if (hasShuffleRead) {
+ {if (stageData.hasShuffleRead) {
<li>
<strong>Shuffle read: </strong>
- {Utils.bytesToString(stageData.shuffleReadBytes)}
+ {s"${Utils.bytesToString(stageData.shuffleReadBytes)} / " +
+ s"${stageData.shuffleReadRecords}"}
</li>
}}
- {if (hasShuffleWrite) {
+ {if (stageData.hasShuffleWrite) {
<li>
<strong>Shuffle write: </strong>
- {Utils.bytesToString(stageData.shuffleWriteBytes)}
+ {s"${Utils.bytesToString(stageData.shuffleWriteBytes)} / " +
+ s"${stageData.shuffleWriteRecords}"}
</li>
}}
- {if (hasBytesSpilled) {
+ {if (stageData.hasBytesSpilled) {
<li>
<strong>Shuffle spill (memory): </strong>
{Utils.bytesToString(stageData.memoryBytesSpilled)}
@@ -132,7 +129,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<span class="additional-metric-title">Task Deserialization Time</span>
</span>
</li>
- {if (hasShuffleRead) {
+ {if (stageData.hasShuffleRead) {
<li>
<span data-toggle="tooltip"
title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} data-placement="right">
@@ -174,25 +171,32 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
{if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
- {if (hasInput) Seq(("Input", "")) else Nil} ++
- {if (hasOutput) Seq(("Output", "")) else Nil} ++
- {if (hasShuffleRead) {
+ {if (stageData.hasInput) Seq(("Input Size / Records", "")) else Nil} ++
+ {if (stageData.hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
+ {if (stageData.hasShuffleRead) {
Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
- ("Shuffle Read", ""))
+ ("Shuffle Read Size / Records", ""))
+ } else {
+ Nil
+ }} ++
+ {if (stageData.hasShuffleWrite) {
+ Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
+ } else {
+ Nil
+ }} ++
+ {if (stageData.hasBytesSpilled) {
+ Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
} else {
Nil
}} ++
- {if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write", "")) else Nil} ++
- {if (hasBytesSpilled) Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
- else Nil} ++
Seq(("Errors", ""))
val unzipped = taskHeadersAndCssClasses.unzip
val taskTable = UIUtils.listingTable(
unzipped._1,
- taskRow(hasAccumulators, hasInput, hasOutput, hasShuffleRead, hasShuffleWrite,
- hasBytesSpilled),
+ taskRow(hasAccumulators, stageData.hasInput, stageData.hasOutput,
+ stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled),
tasks,
headerClasses = unzipped._2)
// Excludes tasks which failed and have incomplete metrics
@@ -203,8 +207,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
None
}
else {
+ def getDistributionQuantiles(data: Seq[Double]): IndexedSeq[Double] =
+ Distribution(data).get.getQuantiles()
+
def getFormattedTimeQuantiles(times: Seq[Double]): Seq[Node] = {
- Distribution(times).get.getQuantiles().map { millis =>
+ getDistributionQuantiles(times).map { millis =>
<td>{UIUtils.formatDuration(millis.toLong)}</td>
}
}
@@ -273,17 +280,36 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
getFormattedTimeQuantiles(schedulerDelays)
def getFormattedSizeQuantiles(data: Seq[Double]) =
- Distribution(data).get.getQuantiles().map(d => <td>{Utils.bytesToString(d.toLong)}</td>)
+ getDistributionQuantiles(data).map(d => <td>{Utils.bytesToString(d.toLong)}</td>)
+
+ def getFormattedSizeQuantilesWithRecords(data: Seq[Double], records: Seq[Double]) = {
+ val recordDist = getDistributionQuantiles(records).iterator
+ getDistributionQuantiles(data).map(d =>
+ <td>{s"${Utils.bytesToString(d.toLong)} / ${recordDist.next().toLong}"}</td>
+ )
+ }
val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
}
- val inputQuantiles = <td>Input</td> +: getFormattedSizeQuantiles(inputSizes)
+
+ val inputRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+ }
+
+ val inputQuantiles = <td>Input Size / Records</td> +:
+ getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords)
val outputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}
- val outputQuantiles = <td>Output</td> +: getFormattedSizeQuantiles(outputSizes)
+
+ val outputRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
+ }
+
+ val outputQuantiles = <td>Output Size / Records</td> +:
+ getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords)
val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
@@ -294,14 +320,24 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
- val shuffleReadQuantiles = <td>Shuffle Read (Remote)</td> +:
- getFormattedSizeQuantiles(shuffleReadSizes)
+
+ val shuffleReadRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+ }
+
+ val shuffleReadQuantiles = <td>Shuffle Read Size / Records (Remote)</td> +:
+ getFormattedSizeQuantilesWithRecords(shuffleReadSizes, shuffleReadRecords)
val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
}
- val shuffleWriteQuantiles = <td>Shuffle Write</td> +:
- getFormattedSizeQuantiles(shuffleWriteSizes)
+
+ val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L).toDouble
+ }
+
+ val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
+ getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords)
val memoryBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.memoryBytesSpilled.toDouble
@@ -326,9 +362,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{serializationQuantiles}
</tr>,
<tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>,
- if (hasInput) <tr>{inputQuantiles}</tr> else Nil,
- if (hasOutput) <tr>{outputQuantiles}</tr> else Nil,
- if (hasShuffleRead) {
+ if (stageData.hasInput) <tr>{inputQuantiles}</tr> else Nil,
+ if (stageData.hasOutput) <tr>{outputQuantiles}</tr> else Nil,
+ if (stageData.hasShuffleRead) {
<tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
{shuffleReadBlockedQuantiles}
</tr>
@@ -336,9 +372,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
} else {
Nil
},
- if (hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> else Nil,
- if (hasBytesSpilled) <tr>{memoryBytesSpilledQuantiles}</tr> else Nil,
- if (hasBytesSpilled) <tr>{diskBytesSpilledQuantiles}</tr> else Nil)
+ if (stageData.hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> else Nil,
+ if (stageData.hasBytesSpilled) <tr>{memoryBytesSpilledQuantiles}</tr> else Nil,
+ if (stageData.hasBytesSpilled) <tr>{diskBytesSpilledQuantiles}</tr> else Nil)
val quantileHeaders = Seq("Metric", "Min", "25th percentile",
"Median", "75th percentile", "Max")
@@ -397,26 +433,32 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val inputReadable = maybeInput
.map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
.getOrElse("")
+ val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
val maybeOutput = metrics.flatMap(_.outputMetrics)
val outputSortable = maybeOutput.map(_.bytesWritten.toString).getOrElse("")
val outputReadable = maybeOutput
.map(m => s"${Utils.bytesToString(m.bytesWritten)}")
.getOrElse("")
+ val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("")
- val maybeShuffleReadBlockedTime = metrics.flatMap(_.shuffleReadMetrics).map(_.fetchWaitTime)
- val shuffleReadBlockedTimeSortable = maybeShuffleReadBlockedTime.map(_.toString).getOrElse("")
+ val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics)
+ val shuffleReadBlockedTimeSortable = maybeShuffleRead
+ .map(_.fetchWaitTime.toString).getOrElse("")
val shuffleReadBlockedTimeReadable =
- maybeShuffleReadBlockedTime.map(ms => UIUtils.formatDuration(ms)).getOrElse("")
+ maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
- val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead)
- val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
- val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("")
+ val shuffleReadSortable = maybeShuffleRead.map(_.remoteBytesRead.toString).getOrElse("")
+ val shuffleReadReadable = maybeShuffleRead
+ .map(m => s"${Utils.bytesToString(m.remoteBytesRead)}").getOrElse("")
+ val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
- val maybeShuffleWrite =
- metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten)
- val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("")
- val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("")
+ val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
+ val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
+ val shuffleWriteReadable = maybeShuffleWrite
+ .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
+ val shuffleWriteRecords = maybeShuffleWrite
+ .map(_.shuffleRecordsWritten.toString).getOrElse("")
val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
@@ -472,12 +514,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
}}
{if (hasInput) {
<td sorttable_customkey={inputSortable}>
- {inputReadable}
+ {s"$inputReadable / $inputRecords"}
</td>
}}
{if (hasOutput) {
<td sorttable_customkey={outputSortable}>
- {outputReadable}
+ {s"$outputReadable / $outputRecords"}
</td>
}}
{if (hasShuffleRead) {
@@ -486,7 +528,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{shuffleReadBlockedTimeReadable}
</td>
<td sorttable_customkey={shuffleReadSortable}>
- {shuffleReadReadable}
+ {s"$shuffleReadReadable / $shuffleReadRecords"}
</td>
}}
{if (hasShuffleWrite) {
@@ -494,7 +536,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{writeTimeReadable}
</td>
<td sorttable_customkey={shuffleWriteSortable}>
- {shuffleWriteReadable}
+ {s"$shuffleWriteReadable / $shuffleWriteRecords"}
</td>
}}
{if (hasBytesSpilled) {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 01f7e23212..69aac6c862 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -31,9 +31,13 @@ private[jobs] object UIData {
var failedTasks : Int = 0
var succeededTasks : Int = 0
var inputBytes : Long = 0
+ var inputRecords : Long = 0
var outputBytes : Long = 0
+ var outputRecords : Long = 0
var shuffleRead : Long = 0
+ var shuffleReadRecords : Long = 0
var shuffleWrite : Long = 0
+ var shuffleWriteRecords : Long = 0
var memoryBytesSpilled : Long = 0
var diskBytesSpilled : Long = 0
}
@@ -73,9 +77,13 @@ private[jobs] object UIData {
var executorRunTime: Long = _
var inputBytes: Long = _
+ var inputRecords: Long = _
var outputBytes: Long = _
+ var outputRecords: Long = _
var shuffleReadBytes: Long = _
+ var shuffleReadRecords : Long = _
var shuffleWriteBytes: Long = _
+ var shuffleWriteRecords: Long = _
var memoryBytesSpilled: Long = _
var diskBytesSpilled: Long = _
@@ -85,6 +93,12 @@ private[jobs] object UIData {
var accumulables = new HashMap[Long, AccumulableInfo]
var taskData = new HashMap[Long, TaskUIData]
var executorSummary = new HashMap[String, ExecutorSummary]
+
+ def hasInput = inputBytes > 0
+ def hasOutput = outputBytes > 0
+ def hasShuffleRead = shuffleReadBytes > 0
+ def hasShuffleWrite = shuffleWriteBytes > 0
+ def hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c8407bbcb7..b0b545640f 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -293,22 +293,26 @@ private[spark] object JsonProtocol {
("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
- ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead)
+ ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~
+ ("Total Records Read" -> shuffleReadMetrics.recordsRead)
}
def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = {
("Shuffle Bytes Written" -> shuffleWriteMetrics.shuffleBytesWritten) ~
- ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime)
+ ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) ~
+ ("Shuffle Records Written" -> shuffleWriteMetrics.shuffleRecordsWritten)
}
def inputMetricsToJson(inputMetrics: InputMetrics): JValue = {
("Data Read Method" -> inputMetrics.readMethod.toString) ~
- ("Bytes Read" -> inputMetrics.bytesRead)
+ ("Bytes Read" -> inputMetrics.bytesRead) ~
+ ("Records Read" -> inputMetrics.recordsRead)
}
def outputMetricsToJson(outputMetrics: OutputMetrics): JValue = {
("Data Write Method" -> outputMetrics.writeMethod.toString) ~
- ("Bytes Written" -> outputMetrics.bytesWritten)
+ ("Bytes Written" -> outputMetrics.bytesWritten) ~
+ ("Records Written" -> outputMetrics.recordsWritten)
}
def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
@@ -670,6 +674,7 @@ private[spark] object JsonProtocol {
metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
+ metrics.incRecordsRead((json \ "Total Records Read").extractOpt[Long].getOrElse(0))
metrics
}
@@ -677,13 +682,16 @@ private[spark] object JsonProtocol {
val metrics = new ShuffleWriteMetrics
metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long])
+ metrics.setShuffleRecordsWritten((json \ "Shuffle Records Written")
+ .extractOpt[Long].getOrElse(0))
metrics
}
def inputMetricsFromJson(json: JValue): InputMetrics = {
val metrics = new InputMetrics(
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
- metrics.addBytesRead((json \ "Bytes Read").extract[Long])
+ metrics.incBytesRead((json \ "Bytes Read").extract[Long])
+ metrics.incRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(0))
metrics
}
@@ -691,6 +699,7 @@ private[spark] object JsonProtocol {
val metrics = new OutputMetrics(
DataWriteMethod.withName((json \ "Data Write Method").extract[String]))
metrics.setBytesWritten((json \ "Bytes Written").extract[Long])
+ metrics.setRecordsWritten((json \ "Records Written").extractOpt[Long].getOrElse(0))
metrics
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 6ba03841f7..eaec5a71e6 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -763,6 +763,7 @@ private[spark] class ExternalSorter[K, V, C](
if (curWriteMetrics != null) {
m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten)
m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime)
+ m.incShuffleRecordsWritten(curWriteMetrics.shuffleRecordsWritten)
}
}
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index 81db66ae17..78fa98a3b9 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -21,44 +21,46 @@ import java.io.{File, FileWriter, PrintWriter}
import scala.collection.mutable.ArrayBuffer
-import org.scalatest.FunSuite
-
+import org.apache.commons.lang.math.RandomUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.mapred.{FileSplit => OldFileSplit, InputSplit => OldInputSplit, JobConf,
- LineRecordReader => OldLineRecordReader, RecordReader => OldRecordReader, Reporter,
- TextInputFormat => OldTextInputFormat}
import org.apache.hadoop.mapred.lib.{CombineFileInputFormat => OldCombineFileInputFormat,
- CombineFileSplit => OldCombineFileSplit, CombineFileRecordReader => OldCombineFileRecordReader}
-import org.apache.hadoop.mapreduce.{InputSplit => NewInputSplit, RecordReader => NewRecordReader,
- TaskAttemptContext}
+ CombineFileRecordReader => OldCombineFileRecordReader, CombineFileSplit => OldCombineFileSplit}
+import org.apache.hadoop.mapred.{JobConf, Reporter, FileSplit => OldFileSplit,
+ InputSplit => OldInputSplit, LineRecordReader => OldLineRecordReader,
+ RecordReader => OldRecordReader, TextInputFormat => OldTextInputFormat}
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat => NewCombineFileInputFormat,
CombineFileRecordReader => NewCombineFileRecordReader, CombineFileSplit => NewCombineFileSplit,
FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.apache.hadoop.mapreduce.{TaskAttemptContext, InputSplit => NewInputSplit,
+ RecordReader => NewRecordReader}
+import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.SharedSparkContext
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.util.Utils
-class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
+class InputOutputMetricsSuite extends FunSuite with SharedSparkContext
+ with BeforeAndAfter {
@transient var tmpDir: File = _
@transient var tmpFile: File = _
@transient var tmpFilePath: String = _
+ @transient val numRecords: Int = 100000
+ @transient val numBuckets: Int = 10
- override def beforeAll() {
- super.beforeAll()
-
+ before {
tmpDir = Utils.createTempDir()
val testTempDir = new File(tmpDir, "test")
testTempDir.mkdir()
tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt")
val pw = new PrintWriter(new FileWriter(tmpFile))
- for (x <- 1 to 1000000) {
- pw.println("s")
+ for (x <- 1 to numRecords) {
+ pw.println(RandomUtils.nextInt(numBuckets))
}
pw.close()
@@ -66,8 +68,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
tmpFilePath = "file://" + tmpFile.getAbsolutePath
}
- override def afterAll() {
- super.afterAll()
+ after {
Utils.deleteRecursively(tmpDir)
}
@@ -155,6 +156,101 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
assert(bytesRead >= tmpFile.length())
}
+ test("input metrics on records read - simple") {
+ val records = runAndReturnRecordsRead {
+ sc.textFile(tmpFilePath, 4).count()
+ }
+ assert(records == numRecords)
+ }
+
+ test("input metrics on records read - more stages") {
+ val records = runAndReturnRecordsRead {
+ sc.textFile(tmpFilePath, 4)
+ .map(key => (key.length, 1))
+ .reduceByKey(_ + _)
+ .count()
+ }
+ assert(records == numRecords)
+ }
+
+ test("input metrics on records - New Hadoop API") {
+ val records = runAndReturnRecordsRead {
+ sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
+ classOf[Text]).count()
+ }
+ assert(records == numRecords)
+ }
+
+ test("input metrics on recordsd read with cache") {
+ // prime the cache manager
+ val rdd = sc.textFile(tmpFilePath, 4).cache()
+ rdd.collect()
+
+ val records = runAndReturnRecordsRead {
+ rdd.count()
+ }
+
+ assert(records == numRecords)
+ }
+
+ test("shuffle records read metrics") {
+ val recordsRead = runAndReturnShuffleRecordsRead {
+ sc.textFile(tmpFilePath, 4)
+ .map(key => (key, 1))
+ .groupByKey()
+ .collect()
+ }
+ assert(recordsRead == numRecords)
+ }
+
+ test("shuffle records written metrics") {
+ val recordsWritten = runAndReturnShuffleRecordsWritten {
+ sc.textFile(tmpFilePath, 4)
+ .map(key => (key, 1))
+ .groupByKey()
+ .collect()
+ }
+ assert(recordsWritten == numRecords)
+ }
+
+ /**
+ * Tests the metrics from end to end.
+ * 1) reading a hadoop file
+ * 2) shuffle and writing to a hadoop file.
+ * 3) writing to hadoop file.
+ */
+ test("input read/write and shuffle read/write metrics all line up") {
+ var inputRead = 0L
+ var outputWritten = 0L
+ var shuffleRead = 0L
+ var shuffleWritten = 0L
+ sc.addSparkListener(new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val metrics = taskEnd.taskMetrics
+ metrics.inputMetrics.foreach(inputRead += _.recordsRead)
+ metrics.outputMetrics.foreach(outputWritten += _.recordsWritten)
+ metrics.shuffleReadMetrics.foreach(shuffleRead += _.recordsRead)
+ metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.shuffleRecordsWritten)
+ }
+ })
+
+ val tmpFile = new File(tmpDir, getClass.getSimpleName)
+
+ sc.textFile(tmpFilePath, 4)
+ .map(key => (key, 1))
+ .reduceByKey(_+_)
+ .saveAsTextFile("file://" + tmpFile.getAbsolutePath)
+
+ sc.listenerBus.waitUntilEmpty(500)
+ assert(inputRead == numRecords)
+
+ // Only supported on newer Hadoop
+ if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
+ assert(outputWritten == numBuckets)
+ }
+ assert(shuffleRead == shuffleWritten)
+ }
+
test("input metrics with interleaved reads") {
val numPartitions = 2
val cartVector = 0 to 9
@@ -193,18 +289,66 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
assert(cartesianBytes == firstSize * numPartitions + (cartVector.length * secondSize))
}
- private def runAndReturnBytesRead(job : => Unit): Long = {
- val taskBytesRead = new ArrayBuffer[Long]()
+ private def runAndReturnBytesRead(job: => Unit): Long = {
+ runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.bytesRead))
+ }
+
+ private def runAndReturnRecordsRead(job: => Unit): Long = {
+ runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.recordsRead))
+ }
+
+ private def runAndReturnRecordsWritten(job: => Unit): Long = {
+ runAndReturnMetrics(job, _.taskMetrics.outputMetrics.map(_.recordsWritten))
+ }
+
+ private def runAndReturnShuffleRecordsRead(job: => Unit): Long = {
+ runAndReturnMetrics(job, _.taskMetrics.shuffleReadMetrics.map(_.recordsRead))
+ }
+
+ private def runAndReturnShuffleRecordsWritten(job: => Unit): Long = {
+ runAndReturnMetrics(job, _.taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten))
+ }
+
+ private def runAndReturnMetrics(job: => Unit,
+ collector: (SparkListenerTaskEnd) => Option[Long]): Long = {
+ val taskMetrics = new ArrayBuffer[Long]()
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
+ collector(taskEnd).foreach(taskMetrics += _)
}
})
job
sc.listenerBus.waitUntilEmpty(500)
- taskBytesRead.sum
+ taskMetrics.sum
+ }
+
+ test("output metrics on records written") {
+ // Only supported on newer Hadoop
+ if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
+ val file = new File(tmpDir, getClass.getSimpleName)
+ val filePath = "file://" + file.getAbsolutePath
+
+ val records = runAndReturnRecordsWritten {
+ sc.parallelize(1 to numRecords).saveAsTextFile(filePath)
+ }
+ assert(records == numRecords)
+ }
+ }
+
+ test("output metrics on records written - new Hadoop API") {
+ // Only supported on newer Hadoop
+ if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
+ val file = new File(tmpDir, getClass.getSimpleName)
+ val filePath = "file://" + file.getAbsolutePath
+
+ val records = runAndReturnRecordsWritten {
+ sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString))
+ .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath)
+ }
+ assert(records == numRecords)
+ }
}
test("output metrics when writing text file") {
@@ -318,4 +462,4 @@ class NewCombineTextRecordReaderWrapper(
override def getCurrentValue(): Text = delegate.getCurrentValue
override def getProgress(): Float = delegate.getProgress
override def close(): Unit = delegate.close()
-} \ No newline at end of file
+}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
index bbc7e1357b..c21c92b63a 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
@@ -31,6 +31,8 @@ class BlockObjectWriterSuite extends FunSuite {
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
writer.write(Long.box(20))
+ // Record metrics update on every write
+ assert(writeMetrics.shuffleRecordsWritten === 1)
// Metrics don't update on every write
assert(writeMetrics.shuffleBytesWritten == 0)
// After 32 writes, metrics should update
@@ -39,6 +41,7 @@ class BlockObjectWriterSuite extends FunSuite {
writer.write(Long.box(i))
}
assert(writeMetrics.shuffleBytesWritten > 0)
+ assert(writeMetrics.shuffleRecordsWritten === 33)
writer.commitAndClose()
assert(file.length() == writeMetrics.shuffleBytesWritten)
}
@@ -51,6 +54,8 @@ class BlockObjectWriterSuite extends FunSuite {
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
writer.write(Long.box(20))
+ // Record metrics update on every write
+ assert(writeMetrics.shuffleRecordsWritten === 1)
// Metrics don't update on every write
assert(writeMetrics.shuffleBytesWritten == 0)
// After 32 writes, metrics should update
@@ -59,7 +64,23 @@ class BlockObjectWriterSuite extends FunSuite {
writer.write(Long.box(i))
}
assert(writeMetrics.shuffleBytesWritten > 0)
+ assert(writeMetrics.shuffleRecordsWritten === 33)
writer.revertPartialWritesAndClose()
assert(writeMetrics.shuffleBytesWritten == 0)
+ assert(writeMetrics.shuffleRecordsWritten == 0)
+ }
+
+ test("Reopening a closed block writer") {
+ val file = new File("somefile")
+ file.deleteOnExit()
+ val writeMetrics = new ShuffleWriteMetrics()
+ val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
+ new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
+
+ writer.open()
+ writer.close()
+ intercept[IllegalStateException] {
+ writer.open()
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 68074ae32a..e8405baa8e 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -234,7 +234,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskMetrics.incMemoryBytesSpilled(base + 6)
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
taskMetrics.setInputMetrics(Some(inputMetrics))
- inputMetrics.addBytesRead(base + 7)
+ inputMetrics.incBytesRead(base + 7)
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
taskMetrics.outputMetrics = Some(outputMetrics)
outputMetrics.setBytesWritten(base + 8)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 842f54529b..f3017dc42c 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -189,6 +189,34 @@ class JsonProtocolSuite extends FunSuite {
assert(newMetrics.inputMetrics.isEmpty)
}
+ test("Input/Output records backwards compatibility") {
+ // records read were added after 1.2
+ val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
+ hasHadoopInput = true, hasOutput = true, hasRecords = false)
+ assert(metrics.inputMetrics.nonEmpty)
+ assert(metrics.outputMetrics.nonEmpty)
+ val newJson = JsonProtocol.taskMetricsToJson(metrics)
+ val oldJson = newJson.removeField { case (field, _) => field == "Records Read" }
+ .removeField { case (field, _) => field == "Records Written" }
+ val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+ assert(newMetrics.inputMetrics.get.recordsRead == 0)
+ assert(newMetrics.outputMetrics.get.recordsWritten == 0)
+ }
+
+ test("Shuffle Read/Write records backwards compatibility") {
+ // records read were added after 1.2
+ val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
+ hasHadoopInput = false, hasOutput = false, hasRecords = false)
+ assert(metrics.shuffleReadMetrics.nonEmpty)
+ assert(metrics.shuffleWriteMetrics.nonEmpty)
+ val newJson = JsonProtocol.taskMetricsToJson(metrics)
+ val oldJson = newJson.removeField { case (field, _) => field == "Total Records Read" }
+ .removeField { case (field, _) => field == "Shuffle Records Written" }
+ val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+ assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0)
+ assert(newMetrics.shuffleWriteMetrics.get.shuffleRecordsWritten == 0)
+ }
+
test("OutputMetrics backward compatibility") {
// OutputMetrics were added after 1.1
val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = true)
@@ -644,7 +672,8 @@ class JsonProtocolSuite extends FunSuite {
e: Int,
f: Int,
hasHadoopInput: Boolean,
- hasOutput: Boolean) = {
+ hasOutput: Boolean,
+ hasRecords: Boolean = true) = {
val t = new TaskMetrics
t.setHostname("localhost")
t.setExecutorDeserializeTime(a)
@@ -656,7 +685,8 @@ class JsonProtocolSuite extends FunSuite {
if (hasHadoopInput) {
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
- inputMetrics.addBytesRead(d + e + f)
+ inputMetrics.incBytesRead(d + e + f)
+ inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
t.setInputMetrics(Some(inputMetrics))
} else {
val sr = new ShuffleReadMetrics
@@ -664,16 +694,19 @@ class JsonProtocolSuite extends FunSuite {
sr.incLocalBlocksFetched(e)
sr.incFetchWaitTime(a + d)
sr.incRemoteBlocksFetched(f)
+ sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1)
t.setShuffleReadMetrics(Some(sr))
}
if (hasOutput) {
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
outputMetrics.setBytesWritten(a + b + c)
+ outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c)/100 else -1)
t.outputMetrics = Some(outputMetrics)
} else {
val sw = new ShuffleWriteMetrics
sw.incShuffleBytesWritten(a + b + c)
sw.incShuffleWriteTime(b + c + d)
+ sw.setShuffleRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
t.shuffleWriteMetrics = Some(sw)
}
// Make at most 6 blocks
@@ -907,11 +940,13 @@ class JsonProtocolSuite extends FunSuite {
| "Remote Blocks Fetched": 800,
| "Local Blocks Fetched": 700,
| "Fetch Wait Time": 900,
- | "Remote Bytes Read": 1000
+ | "Remote Bytes Read": 1000,
+ | "Total Records Read" : 10
| },
| "Shuffle Write Metrics": {
| "Shuffle Bytes Written": 1200,
- | "Shuffle Write Time": 1500
+ | "Shuffle Write Time": 1500,
+ | "Shuffle Records Written": 12
| },
| "Updated Blocks": [
| {
@@ -988,11 +1023,13 @@ class JsonProtocolSuite extends FunSuite {
| "Disk Bytes Spilled": 0,
| "Shuffle Write Metrics": {
| "Shuffle Bytes Written": 1200,
- | "Shuffle Write Time": 1500
+ | "Shuffle Write Time": 1500,
+ | "Shuffle Records Written": 12
| },
| "Input Metrics": {
| "Data Read Method": "Hadoop",
- | "Bytes Read": 2100
+ | "Bytes Read": 2100,
+ | "Records Read": 21
| },
| "Updated Blocks": [
| {
@@ -1069,11 +1106,13 @@ class JsonProtocolSuite extends FunSuite {
| "Disk Bytes Spilled": 0,
| "Input Metrics": {
| "Data Read Method": "Hadoop",
- | "Bytes Read": 2100
+ | "Bytes Read": 2100,
+ | "Records Read": 21
| },
| "Output Metrics": {
| "Data Write Method": "Hadoop",
- | "Bytes Written": 1200
+ | "Bytes Written": 1200,
+ | "Records Written": 12
| },
| "Updated Blocks": [
| {