aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2015-02-12 14:35:44 -0800
committerAndrew Or <andrew@databricks.com>2015-02-12 14:36:27 -0800
commit893d6fd7049daf3c4d01eb6a960801cd064d5f73 (patch)
treec7571d8be3859ed07458d25741d7e625cac8804a /core
parentaa4ca8b873fd83e64e5faea6f7febcc830e30b02 (diff)
downloadspark-893d6fd7049daf3c4d01eb6a960801cd064d5f73.tar.gz
spark-893d6fd7049daf3c4d01eb6a960801cd064d5f73.tar.bz2
spark-893d6fd7049daf3c4d01eb6a960801cd064d5f73.zip
[SPARK-5645] Added local read bytes/time to task metrics
ksakellis I stumbled on your JIRA for this yesterday; I know it's assigned to you but I'd already done this for my own uses a while ago so thought I could help save you the work of doing it! Hopefully this doesn't duplicate any work you've already done. Here's a screenshot of what the UI looks like: ![image](https://cloud.githubusercontent.com/assets/1108612/6135352/c03e7276-b11c-11e4-8f11-c6aefe1f35b9.png) Based on a discussion with pwendell, I put the data read remotely in as an additional metric rather than showing it in brackets as you'd suggested, Kostas. The assumption here is that the average user doesn't care about the differentiation between local / remote data, so it's better not to pollute the UI. I also added data about the local read time, which I've found very helpful for debugging, but I didn't put it in the UI because I think it's probably something not a ton of people will need to use. With this change, the total read time and total write time shown in the UI will be equal, fixing a long-term source of user confusion: ![image](https://cloud.githubusercontent.com/assets/1108612/6135399/25f14490-b11d-11e4-8086-20be5f4002e6.png) Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #4510 from kayousterhout/SPARK-5645 and squashes the following commits: 4a0182c [Kay Ousterhout] oops 5f5da1b [Kay Ousterhout] Small style fix 5da04cf [Kay Ousterhout] Addressed more comments from Kostas ba05149 [Kay Ousterhout] Remove parens a9dc685 [Kay Ousterhout] Kostas comment, test fix 33d2e2d [Kay Ousterhout] Merge remote-tracking branch 'upstream/master' into SPARK-5645 347e2cd [Kay Ousterhout] [SPARK-5645] Added local read bytes/time to task metrics
Diffstat (limited to 'core')
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/webui.css4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ToolTips.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala67
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala17
13 files changed, 125 insertions, 28 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 68b33b5f0d..6c37cc8b98 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -196,7 +196,7 @@ span.additional-metric-title {
/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
-.scheduler_delay, .deserialization_time, .fetch_wait_time, .serialization_time,
-.getting_result_time {
+.scheduler_delay, .deserialization_time, .fetch_wait_time, .shuffle_read_remote,
+.serialization_time, .getting_result_time {
display: none;
}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index bf3f1e4fc7..df36566bec 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -202,6 +202,8 @@ class TaskMetrics extends Serializable {
merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
+ merged.incLocalBytesRead(depMetrics.localBytesRead)
+ merged.incLocalReadTime(depMetrics.localReadTime)
merged.incRecordsRead(depMetrics.recordsRead)
}
_shuffleReadMetrics = Some(merged)
@@ -344,6 +346,25 @@ class ShuffleReadMetrics extends Serializable {
private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
/**
+ * Time the task spent (in milliseconds) reading local shuffle blocks (from the local disk).
+ */
+ private var _localReadTime: Long = _
+ def localReadTime = _localReadTime
+ private[spark] def incLocalReadTime(value: Long) = _localReadTime += value
+
+ /**
+ * Shuffle data that was read from the local disk (as opposed to from a remote executor).
+ */
+ private var _localBytesRead: Long = _
+ def localBytesRead = _localBytesRead
+ private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
+
+ /**
+ * Total bytes fetched in the shuffle by this task (both remote and local).
+ */
+ def totalBytesRead = _remoteBytesRead + _localBytesRead
+
+ /**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 3bb54855ba..f9fc8aa304 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -169,7 +169,9 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
" REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
- " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
+ " REMOTE_BYTES_READ=" + metrics.remoteBytesRead +
+ " LOCAL_READ_TIME=" + metrics.localReadTime +
+ " LOCAL_BYTES_READ=" + metrics.localBytesRead
case None => ""
}
val writeMetrics = taskMetrics.shuffleWriteMetrics match {
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index ab9ee4f009..2ebb79989d 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -228,12 +228,14 @@ final class ShuffleBlockFetcherIterator(
* track in-memory are the ManagedBuffer references themselves.
*/
private[this] def fetchLocalBlocks() {
+ val startTime = System.currentTimeMillis
val iter = localBlocks.iterator
while (iter.hasNext) {
val blockId = iter.next()
try {
val buf = blockManager.getBlockData(blockId)
shuffleMetrics.incLocalBlocksFetched(1)
+ shuffleMetrics.incLocalBytesRead(buf.size)
buf.retain()
results.put(new SuccessFetchResult(blockId, 0, buf))
} catch {
@@ -244,6 +246,7 @@ final class ShuffleBlockFetcherIterator(
return
}
}
+ shuffleMetrics.incLocalReadTime(System.currentTimeMillis - startTime)
}
private[this] def initialize(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index 3a15e603b1..cae6870c2a 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -37,8 +37,12 @@ private[spark] object ToolTips {
"Bytes and records written to disk in order to be read by a shuffle in a future stage."
val SHUFFLE_READ =
- """Bytes and records read from remote executors. Typically less than shuffle write bytes
- because this does not include shuffle data read locally."""
+ """Total shuffle bytes and records read (includes both data read locally and data read from
+ remote executors). """
+
+ val SHUFFLE_READ_REMOTE_SIZE =
+ """Total shuffle bytes read from remote executors. This is a subset of the shuffle
+ read bytes; the remaining shuffle data is read locally. """
val GETTING_RESULT_TIME =
"""Time that the driver spends fetching task results from workers. If this is large, consider
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index f463f8d7c7..0b6fe70bd2 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -401,9 +401,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta
val shuffleReadDelta =
- (taskMetrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L)
- - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L))
- stageData.shuffleReadBytes += shuffleReadDelta
+ (taskMetrics.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L)
+ - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.totalBytesRead).getOrElse(0L))
+ stageData.shuffleReadTotalBytes += shuffleReadDelta
execSummary.shuffleRead += shuffleReadDelta
val shuffleReadRecordsDelta =
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 05ffd5bc58..d752434ad5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -85,7 +85,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{if (stageData.hasShuffleRead) {
<li>
<strong>Shuffle read: </strong>
- {s"${Utils.bytesToString(stageData.shuffleReadBytes)} / " +
+ {s"${Utils.bytesToString(stageData.shuffleReadTotalBytes)} / " +
s"${stageData.shuffleReadRecords}"}
</li>
}}
@@ -143,6 +143,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<span class="additional-metric-title">Shuffle Read Blocked Time</span>
</span>
</li>
+ <li>
+ <span data-toggle="tooltip"
+ title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} data-placement="right">
+ <input type="checkbox" name={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}/>
+ <span class="additional-metric-title">Shuffle Remote Reads</span>
+ </span>
+ </li>
}}
<li>
<span data-toggle="tooltip"
@@ -181,7 +188,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{if (stageData.hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
{if (stageData.hasShuffleRead) {
Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
- ("Shuffle Read Size / Records", ""))
+ ("Shuffle Read Size / Records", ""),
+ ("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
} else {
Nil
}} ++
@@ -320,19 +328,41 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
}
- val shuffleReadBlockedQuantiles = <td>Shuffle Read Blocked Time</td> +:
+ val shuffleReadBlockedQuantiles =
+ <td>
+ <span data-toggle="tooltip"
+ title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} data-placement="right">
+ Shuffle Read Blocked Time
+ </span>
+ </td> +:
getFormattedTimeQuantiles(shuffleReadBlockedTimes)
- val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ val shuffleReadTotalSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble
}
-
- val shuffleReadRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
+ val shuffleReadTotalRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
}
+ val shuffleReadTotalQuantiles =
+ <td>
+ <span data-toggle="tooltip"
+ title={ToolTips.SHUFFLE_READ} data-placement="right">
+ Shuffle Read Size / Records
+ </span>
+ </td> +:
+ getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords)
- val shuffleReadQuantiles = <td>Shuffle Read Size / Records (Remote)</td> +:
- getFormattedSizeQuantilesWithRecords(shuffleReadSizes, shuffleReadRecords)
+ val shuffleReadRemoteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ }
+ val shuffleReadRemoteQuantiles =
+ <td>
+ <span data-toggle="tooltip"
+ title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} data-placement="right">
+ Shuffle Remote Reads
+ </span>
+ </td> +:
+ getFormattedSizeQuantiles(shuffleReadRemoteSizes)
val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
@@ -374,7 +404,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
{shuffleReadBlockedQuantiles}
</tr>
- <tr>{shuffleReadQuantiles}</tr>
+ <tr>{shuffleReadTotalQuantiles}</tr>
+ <tr class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
+ {shuffleReadRemoteQuantiles}
+ </tr>
} else {
Nil
},
@@ -454,11 +487,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val shuffleReadBlockedTimeReadable =
maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
- val shuffleReadSortable = maybeShuffleRead.map(_.remoteBytesRead.toString).getOrElse("")
- val shuffleReadReadable = maybeShuffleRead
- .map(m => s"${Utils.bytesToString(m.remoteBytesRead)}").getOrElse("")
+ val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
+ val shuffleReadSortable = totalShuffleBytes.map(_.toString).getOrElse("")
+ val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
+ val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
+ val shuffleReadRemoteSortable = remoteShuffleBytes.map(_.toString).getOrElse("")
+ val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
+
val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
val shuffleWriteReadable = maybeShuffleWrite
@@ -536,6 +573,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<td sorttable_customkey={shuffleReadSortable}>
{s"$shuffleReadReadable / $shuffleReadRecords"}
</td>
+ <td sorttable_customkey={shuffleReadRemoteSortable}
+ class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
+ {shuffleReadRemoteReadable}
+ </td>
}}
{if (hasShuffleWrite) {
<td sorttable_customkey={writeTimeSortable}>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 703d43f9c6..5865850fa0 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -138,7 +138,7 @@ private[ui] class StageTableBase(
val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else ""
val outputWrite = stageData.outputBytes
val outputWriteWithUnit = if (outputWrite > 0) Utils.bytesToString(outputWrite) else ""
- val shuffleRead = stageData.shuffleReadBytes
+ val shuffleRead = stageData.shuffleReadTotalBytes
val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else ""
val shuffleWrite = stageData.shuffleWriteBytes
val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
index 37cf2c207b..9bf67db8ac 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
@@ -28,6 +28,7 @@ private[spark] object TaskDetailsClassNames {
val SCHEDULER_DELAY = "scheduler_delay"
val TASK_DESERIALIZATION_TIME = "deserialization_time"
val SHUFFLE_READ_BLOCKED_TIME = "fetch_wait_time"
+ val SHUFFLE_READ_REMOTE_SIZE = "shuffle_read_remote"
val RESULT_SERIALIZATION_TIME = "serialization_time"
val GETTING_RESULT_TIME = "getting_result_time"
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 69aac6c862..dbf1ceeda1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -80,7 +80,7 @@ private[jobs] object UIData {
var inputRecords: Long = _
var outputBytes: Long = _
var outputRecords: Long = _
- var shuffleReadBytes: Long = _
+ var shuffleReadTotalBytes: Long = _
var shuffleReadRecords : Long = _
var shuffleWriteBytes: Long = _
var shuffleWriteRecords: Long = _
@@ -96,7 +96,7 @@ private[jobs] object UIData {
def hasInput = inputBytes > 0
def hasOutput = outputBytes > 0
- def hasShuffleRead = shuffleReadBytes > 0
+ def hasShuffleRead = shuffleReadTotalBytes > 0
def hasShuffleWrite = shuffleWriteBytes > 0
def hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index b0b545640f..58d37e2d66 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -294,6 +294,8 @@ private[spark] object JsonProtocol {
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~
+ ("Local Read Time" -> shuffleReadMetrics.localReadTime) ~
+ ("Local Bytes Read" -> shuffleReadMetrics.localBytesRead) ~
("Total Records Read" -> shuffleReadMetrics.recordsRead)
}
@@ -674,6 +676,8 @@ private[spark] object JsonProtocol {
metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
+ metrics.incLocalReadTime((json \ "Local Read Time").extractOpt[Long].getOrElse(0))
+ metrics.incLocalBytesRead((json \ "Local Bytes Read").extractOpt[Long].getOrElse(0))
metrics.incRecordsRead((json \ "Total Records Read").extractOpt[Long].getOrElse(0))
metrics
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index e8405baa8e..6019282d2f 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -227,6 +227,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
shuffleReadMetrics.incRemoteBytesRead(base + 1)
+ shuffleReadMetrics.incLocalBytesRead(base + 9)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
taskMetrics.setExecutorRunTime(base + 4)
@@ -260,8 +261,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
var stage0Data = listener.stageIdToData.get((0, 0)).get
var stage1Data = listener.stageIdToData.get((1, 0)).get
- assert(stage0Data.shuffleReadBytes == 102)
- assert(stage1Data.shuffleReadBytes == 201)
+ assert(stage0Data.shuffleReadTotalBytes == 220)
+ assert(stage1Data.shuffleReadTotalBytes == 410)
assert(stage0Data.shuffleWriteBytes == 106)
assert(stage1Data.shuffleWriteBytes == 203)
assert(stage0Data.executorRunTime == 108)
@@ -290,8 +291,11 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
stage0Data = listener.stageIdToData.get((0, 0)).get
stage1Data = listener.stageIdToData.get((1, 0)).get
- assert(stage0Data.shuffleReadBytes == 402)
- assert(stage1Data.shuffleReadBytes == 602)
+ // Task 1235 contributed (100+1)+(100+9) = 210 shuffle bytes, and task 1234 contributed
+ // (300+1)+(300+9) = 610 total shuffle bytes, so the total for the stage is 820.
+ assert(stage0Data.shuffleReadTotalBytes == 820)
+ // Task 1236 contributed 410 shuffle bytes, and task 1237 contributed 810 shuffle bytes.
+ assert(stage1Data.shuffleReadTotalBytes == 1220)
assert(stage0Data.shuffleWriteBytes == 406)
assert(stage1Data.shuffleWriteBytes == 606)
assert(stage0Data.executorRunTime == 408)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index f3017dc42c..c181baf684 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -260,6 +260,19 @@ class JsonProtocolSuite extends FunSuite {
assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
}
+ test("ShuffleReadMetrics: Local bytes read and time taken backwards compatibility") {
+ // Metrics about local shuffle bytes read and local read time were added in 1.3.1.
+ val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
+ hasHadoopInput = false, hasOutput = false, hasRecords = false)
+ assert(metrics.shuffleReadMetrics.nonEmpty)
+ val newJson = JsonProtocol.taskMetricsToJson(metrics)
+ val oldJson = newJson.removeField { case (field, _) => field == "Local Bytes Read" }
+ .removeField { case (field, _) => field == "Local Read Time" }
+ val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+ assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0)
+ assert(newMetrics.shuffleReadMetrics.get.localReadTime == 0)
+ }
+
test("SparkListenerApplicationStart backwards compatibility") {
// SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
@@ -695,6 +708,8 @@ class JsonProtocolSuite extends FunSuite {
sr.incFetchWaitTime(a + d)
sr.incRemoteBlocksFetched(f)
sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1)
+ sr.incLocalReadTime(a + e)
+ sr.incLocalBytesRead(a + f)
t.setShuffleReadMetrics(Some(sr))
}
if (hasOutput) {
@@ -941,6 +956,8 @@ class JsonProtocolSuite extends FunSuite {
| "Local Blocks Fetched": 700,
| "Fetch Wait Time": 900,
| "Remote Bytes Read": 1000,
+ | "Local Read Time": 1000,
+ | "Local Bytes Read": 1100,
| "Total Records Read" : 10
| },
| "Shuffle Write Metrics": {