aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala3
5 files changed, 0 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index df36566bec..07b152651d 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -203,7 +203,6 @@ class TaskMetrics extends Serializable {
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
merged.incLocalBytesRead(depMetrics.localBytesRead)
- merged.incLocalReadTime(depMetrics.localReadTime)
merged.incRecordsRead(depMetrics.recordsRead)
}
_shuffleReadMetrics = Some(merged)
@@ -346,13 +345,6 @@ class ShuffleReadMetrics extends Serializable {
private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
/**
- * Time the task spent (in milliseconds) reading local shuffle blocks (from the local disk).
- */
- private var _localReadTime: Long = _
- def localReadTime = _localReadTime
- private[spark] def incLocalReadTime(value: Long) = _localReadTime += value
-
- /**
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
private var _localBytesRead: Long = _
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index f9fc8aa304..8aa528ac57 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -170,7 +170,6 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
" REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
" REMOTE_BYTES_READ=" + metrics.remoteBytesRead +
- " LOCAL_READ_TIME=" + metrics.localReadTime +
" LOCAL_BYTES_READ=" + metrics.localBytesRead
case None => ""
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 2ebb79989d..8f28ef49a8 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -228,7 +228,6 @@ final class ShuffleBlockFetcherIterator(
* track in-memory are the ManagedBuffer references themselves.
*/
private[this] def fetchLocalBlocks() {
- val startTime = System.currentTimeMillis
val iter = localBlocks.iterator
while (iter.hasNext) {
val blockId = iter.next()
@@ -246,7 +245,6 @@ final class ShuffleBlockFetcherIterator(
return
}
}
- shuffleMetrics.incLocalReadTime(System.currentTimeMillis - startTime)
}
private[this] def initialize(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 58d37e2d66..8e20864db5 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -294,7 +294,6 @@ private[spark] object JsonProtocol {
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~
- ("Local Read Time" -> shuffleReadMetrics.localReadTime) ~
("Local Bytes Read" -> shuffleReadMetrics.localBytesRead) ~
("Total Records Read" -> shuffleReadMetrics.recordsRead)
}
@@ -676,7 +675,6 @@ private[spark] object JsonProtocol {
metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
- metrics.incLocalReadTime((json \ "Local Read Time").extractOpt[Long].getOrElse(0))
metrics.incLocalBytesRead((json \ "Local Bytes Read").extractOpt[Long].getOrElse(0))
metrics.incRecordsRead((json \ "Total Records Read").extractOpt[Long].getOrElse(0))
metrics
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index c181baf684..a2be724254 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -270,7 +270,6 @@ class JsonProtocolSuite extends FunSuite {
.removeField { case (field, _) => field == "Local Read Time" }
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0)
- assert(newMetrics.shuffleReadMetrics.get.localReadTime == 0)
}
test("SparkListenerApplicationStart backwards compatibility") {
@@ -708,7 +707,6 @@ class JsonProtocolSuite extends FunSuite {
sr.incFetchWaitTime(a + d)
sr.incRemoteBlocksFetched(f)
sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1)
- sr.incLocalReadTime(a + e)
sr.incLocalBytesRead(a + f)
t.setShuffleReadMetrics(Some(sr))
}
@@ -956,7 +954,6 @@ class JsonProtocolSuite extends FunSuite {
| "Local Blocks Fetched": 700,
| "Fetch Wait Time": 900,
| "Remote Bytes Read": 1000,
- | "Local Read Time": 1000,
| "Local Bytes Read": 1100,
| "Total Records Read" : 10
| },