aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/webui.css4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ToolTips.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala67
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala17
13 files changed, 125 insertions, 28 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 68b33b5f0d..6c37cc8b98 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -196,7 +196,7 @@ span.additional-metric-title {
/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
-.scheduler_delay, .deserialization_time, .fetch_wait_time, .serialization_time,
-.getting_result_time {
+.scheduler_delay, .deserialization_time, .fetch_wait_time, .shuffle_read_remote,
+.serialization_time, .getting_result_time {
display: none;
}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index bf3f1e4fc7..df36566bec 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -202,6 +202,8 @@ class TaskMetrics extends Serializable {
merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
+ merged.incLocalBytesRead(depMetrics.localBytesRead)
+ merged.incLocalReadTime(depMetrics.localReadTime)
merged.incRecordsRead(depMetrics.recordsRead)
}
_shuffleReadMetrics = Some(merged)
@@ -344,6 +346,25 @@ class ShuffleReadMetrics extends Serializable {
private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
/**
+ * Time the task spent (in milliseconds) reading local shuffle blocks (from the local disk).
+ */
+ private var _localReadTime: Long = _
+ def localReadTime = _localReadTime
+ private[spark] def incLocalReadTime(value: Long) = _localReadTime += value
+
+ /**
+ * Shuffle data that was read from the local disk (as opposed to from a remote executor).
+ */
+ private var _localBytesRead: Long = _
+ def localBytesRead = _localBytesRead
+ private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
+
+ /**
+ * Total bytes fetched in the shuffle by this task (both remote and local).
+ */
+ def totalBytesRead = _remoteBytesRead + _localBytesRead
+
+ /**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 3bb54855ba..f9fc8aa304 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -169,7 +169,9 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
" REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
- " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
+ " REMOTE_BYTES_READ=" + metrics.remoteBytesRead +
+ " LOCAL_READ_TIME=" + metrics.localReadTime +
+ " LOCAL_BYTES_READ=" + metrics.localBytesRead
case None => ""
}
val writeMetrics = taskMetrics.shuffleWriteMetrics match {
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index ab9ee4f009..2ebb79989d 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -228,12 +228,14 @@ final class ShuffleBlockFetcherIterator(
* track in-memory are the ManagedBuffer references themselves.
*/
private[this] def fetchLocalBlocks() {
+ val startTime = System.currentTimeMillis
val iter = localBlocks.iterator
while (iter.hasNext) {
val blockId = iter.next()
try {
val buf = blockManager.getBlockData(blockId)
shuffleMetrics.incLocalBlocksFetched(1)
+ shuffleMetrics.incLocalBytesRead(buf.size)
buf.retain()
results.put(new SuccessFetchResult(blockId, 0, buf))
} catch {
@@ -244,6 +246,7 @@ final class ShuffleBlockFetcherIterator(
return
}
}
+ shuffleMetrics.incLocalReadTime(System.currentTimeMillis - startTime)
}
private[this] def initialize(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index 3a15e603b1..cae6870c2a 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -37,8 +37,12 @@ private[spark] object ToolTips {
"Bytes and records written to disk in order to be read by a shuffle in a future stage."
val SHUFFLE_READ =
- """Bytes and records read from remote executors. Typically less than shuffle write bytes
- because this does not include shuffle data read locally."""
+ """Total shuffle bytes and records read (includes both data read locally and data read from
+ remote executors). """
+
+ val SHUFFLE_READ_REMOTE_SIZE =
+ """Total shuffle bytes read from remote executors. This is a subset of the shuffle
+ read bytes; the remaining shuffle data is read locally. """
val GETTING_RESULT_TIME =
"""Time that the driver spends fetching task results from workers. If this is large, consider
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index f463f8d7c7..0b6fe70bd2 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -401,9 +401,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta
val shuffleReadDelta =
- (taskMetrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L)
- - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L))
- stageData.shuffleReadBytes += shuffleReadDelta
+ (taskMetrics.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L)
+ - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.totalBytesRead).getOrElse(0L))
+ stageData.shuffleReadTotalBytes += shuffleReadDelta
execSummary.shuffleRead += shuffleReadDelta
val shuffleReadRecordsDelta =
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 05ffd5bc58..d752434ad5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -85,7 +85,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{if (stageData.hasShuffleRead) {
<li>
<strong>Shuffle read: </strong>
- {s"${Utils.bytesToString(stageData.shuffleReadBytes)} / " +
+ {s"${Utils.bytesToString(stageData.shuffleReadTotalBytes)} / " +
s"${stageData.shuffleReadRecords}"}
</li>
}}
@@ -143,6 +143,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<span class="additional-metric-title">Shuffle Read Blocked Time</span>
</span>
</li>
+ <li>
+ <span data-toggle="tooltip"
+ title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} data-placement="right">
+ <input type="checkbox" name={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}/>
+ <span class="additional-metric-title">Shuffle Remote Reads</span>
+ </span>
+ </li>
}}
<li>
<span data-toggle="tooltip"
@@ -181,7 +188,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{if (stageData.hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
{if (stageData.hasShuffleRead) {
Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
- ("Shuffle Read Size / Records", ""))
+ ("Shuffle Read Size / Records", ""),
+ ("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
} else {
Nil
}} ++
@@ -320,19 +328,41 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
}
- val shuffleReadBlockedQuantiles = <td>Shuffle Read Blocked Time</td> +:
+ val shuffleReadBlockedQuantiles =
+ <td>
+ <span data-toggle="tooltip"
+ title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} data-placement="right">
+ Shuffle Read Blocked Time
+ </span>
+ </td> +:
getFormattedTimeQuantiles(shuffleReadBlockedTimes)
- val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ val shuffleReadTotalSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble
}
-
- val shuffleReadRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
+ val shuffleReadTotalRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
}
+ val shuffleReadTotalQuantiles =
+ <td>
+ <span data-toggle="tooltip"
+ title={ToolTips.SHUFFLE_READ} data-placement="right">
+ Shuffle Read Size / Records
+ </span>
+ </td> +:
+ getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords)
- val shuffleReadQuantiles = <td>Shuffle Read Size / Records (Remote)</td> +:
- getFormattedSizeQuantilesWithRecords(shuffleReadSizes, shuffleReadRecords)
+ val shuffleReadRemoteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ }
+ val shuffleReadRemoteQuantiles =
+ <td>
+ <span data-toggle="tooltip"
+ title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} data-placement="right">
+ Shuffle Remote Reads
+ </span>
+ </td> +:
+ getFormattedSizeQuantiles(shuffleReadRemoteSizes)
val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
@@ -374,7 +404,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
{shuffleReadBlockedQuantiles}
</tr>
- <tr>{shuffleReadQuantiles}</tr>
+ <tr>{shuffleReadTotalQuantiles}</tr>
+ <tr class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
+ {shuffleReadRemoteQuantiles}
+ </tr>
} else {
Nil
},
@@ -454,11 +487,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val shuffleReadBlockedTimeReadable =
maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
- val shuffleReadSortable = maybeShuffleRead.map(_.remoteBytesRead.toString).getOrElse("")
- val shuffleReadReadable = maybeShuffleRead
- .map(m => s"${Utils.bytesToString(m.remoteBytesRead)}").getOrElse("")
+ val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
+ val shuffleReadSortable = totalShuffleBytes.map(_.toString).getOrElse("")
+ val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
+ val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
+ val shuffleReadRemoteSortable = remoteShuffleBytes.map(_.toString).getOrElse("")
+ val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
+
val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
val shuffleWriteReadable = maybeShuffleWrite
@@ -536,6 +573,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<td sorttable_customkey={shuffleReadSortable}>
{s"$shuffleReadReadable / $shuffleReadRecords"}
</td>
+ <td sorttable_customkey={shuffleReadRemoteSortable}
+ class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
+ {shuffleReadRemoteReadable}
+ </td>
}}
{if (hasShuffleWrite) {
<td sorttable_customkey={writeTimeSortable}>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 703d43f9c6..5865850fa0 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -138,7 +138,7 @@ private[ui] class StageTableBase(
val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else ""
val outputWrite = stageData.outputBytes
val outputWriteWithUnit = if (outputWrite > 0) Utils.bytesToString(outputWrite) else ""
- val shuffleRead = stageData.shuffleReadBytes
+ val shuffleRead = stageData.shuffleReadTotalBytes
val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else ""
val shuffleWrite = stageData.shuffleWriteBytes
val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
index 37cf2c207b..9bf67db8ac 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
@@ -28,6 +28,7 @@ private[spark] object TaskDetailsClassNames {
val SCHEDULER_DELAY = "scheduler_delay"
val TASK_DESERIALIZATION_TIME = "deserialization_time"
val SHUFFLE_READ_BLOCKED_TIME = "fetch_wait_time"
+ val SHUFFLE_READ_REMOTE_SIZE = "shuffle_read_remote"
val RESULT_SERIALIZATION_TIME = "serialization_time"
val GETTING_RESULT_TIME = "getting_result_time"
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 69aac6c862..dbf1ceeda1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -80,7 +80,7 @@ private[jobs] object UIData {
var inputRecords: Long = _
var outputBytes: Long = _
var outputRecords: Long = _
- var shuffleReadBytes: Long = _
+ var shuffleReadTotalBytes: Long = _
var shuffleReadRecords : Long = _
var shuffleWriteBytes: Long = _
var shuffleWriteRecords: Long = _
@@ -96,7 +96,7 @@ private[jobs] object UIData {
def hasInput = inputBytes > 0
def hasOutput = outputBytes > 0
- def hasShuffleRead = shuffleReadBytes > 0
+ def hasShuffleRead = shuffleReadTotalBytes > 0
def hasShuffleWrite = shuffleWriteBytes > 0
def hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index b0b545640f..58d37e2d66 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -294,6 +294,8 @@ private[spark] object JsonProtocol {
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~
+ ("Local Read Time" -> shuffleReadMetrics.localReadTime) ~
+ ("Local Bytes Read" -> shuffleReadMetrics.localBytesRead) ~
("Total Records Read" -> shuffleReadMetrics.recordsRead)
}
@@ -674,6 +676,8 @@ private[spark] object JsonProtocol {
metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
+ metrics.incLocalReadTime((json \ "Local Read Time").extractOpt[Long].getOrElse(0))
+ metrics.incLocalBytesRead((json \ "Local Bytes Read").extractOpt[Long].getOrElse(0))
metrics.incRecordsRead((json \ "Total Records Read").extractOpt[Long].getOrElse(0))
metrics
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index e8405baa8e..6019282d2f 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -227,6 +227,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
shuffleReadMetrics.incRemoteBytesRead(base + 1)
+ shuffleReadMetrics.incLocalBytesRead(base + 9)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
taskMetrics.setExecutorRunTime(base + 4)
@@ -260,8 +261,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
var stage0Data = listener.stageIdToData.get((0, 0)).get
var stage1Data = listener.stageIdToData.get((1, 0)).get
- assert(stage0Data.shuffleReadBytes == 102)
- assert(stage1Data.shuffleReadBytes == 201)
+ assert(stage0Data.shuffleReadTotalBytes == 220)
+ assert(stage1Data.shuffleReadTotalBytes == 410)
assert(stage0Data.shuffleWriteBytes == 106)
assert(stage1Data.shuffleWriteBytes == 203)
assert(stage0Data.executorRunTime == 108)
@@ -290,8 +291,11 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
stage0Data = listener.stageIdToData.get((0, 0)).get
stage1Data = listener.stageIdToData.get((1, 0)).get
- assert(stage0Data.shuffleReadBytes == 402)
- assert(stage1Data.shuffleReadBytes == 602)
+ // Task 1235 contributed (100+1)+(100+9) = 210 shuffle bytes, and task 1234 contributed
+ // (300+1)+(300+9) = 610 total shuffle bytes, so the total for the stage is 820.
+ assert(stage0Data.shuffleReadTotalBytes == 820)
+ // Task 1236 contributed 410 shuffle bytes, and task 1237 contributed 810 shuffle bytes.
+ assert(stage1Data.shuffleReadTotalBytes == 1220)
assert(stage0Data.shuffleWriteBytes == 406)
assert(stage1Data.shuffleWriteBytes == 606)
assert(stage0Data.executorRunTime == 408)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index f3017dc42c..c181baf684 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -260,6 +260,19 @@ class JsonProtocolSuite extends FunSuite {
assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
}
+ test("ShuffleReadMetrics: Local bytes read and time taken backwards compatibility") {
+ // Metrics about local shuffle bytes read and local read time were added in 1.3.1.
+ val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
+ hasHadoopInput = false, hasOutput = false, hasRecords = false)
+ assert(metrics.shuffleReadMetrics.nonEmpty)
+ val newJson = JsonProtocol.taskMetricsToJson(metrics)
+ val oldJson = newJson.removeField { case (field, _) => field == "Local Bytes Read" }
+ .removeField { case (field, _) => field == "Local Read Time" }
+ val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+ assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0)
+ assert(newMetrics.shuffleReadMetrics.get.localReadTime == 0)
+ }
+
test("SparkListenerApplicationStart backwards compatibility") {
// SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
@@ -695,6 +708,8 @@ class JsonProtocolSuite extends FunSuite {
sr.incFetchWaitTime(a + d)
sr.incRemoteBlocksFetched(f)
sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1)
+ sr.incLocalReadTime(a + e)
+ sr.incLocalBytesRead(a + f)
t.setShuffleReadMetrics(Some(sr))
}
if (hasOutput) {
@@ -941,6 +956,8 @@ class JsonProtocolSuite extends FunSuite {
| "Local Blocks Fetched": 700,
| "Fetch Wait Time": 900,
| "Remote Bytes Read": 1000,
+ | "Local Read Time": 1000,
+ | "Local Bytes Read": 1100,
| "Total Records Read" : 10
| },
| "Shuffle Write Metrics": {