diff options
5 files changed, 0 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index df36566bec..07b152651d 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -203,7 +203,6 @@ class TaskMetrics extends Serializable { merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) merged.incRemoteBytesRead(depMetrics.remoteBytesRead) merged.incLocalBytesRead(depMetrics.localBytesRead) - merged.incLocalReadTime(depMetrics.localReadTime) merged.incRecordsRead(depMetrics.recordsRead) } _shuffleReadMetrics = Some(merged) @@ -346,13 +345,6 @@ class ShuffleReadMetrics extends Serializable { private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value /** - * Time the task spent (in milliseconds) reading local shuffle blocks (from the local disk). - */ - private var _localReadTime: Long = _ - def localReadTime = _localReadTime - private[spark] def incLocalReadTime(value: Long) = _localReadTime += value - - /** * Shuffle data that was read from the local disk (as opposed to from a remote executor). */ private var _localBytesRead: Long = _ diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index f9fc8aa304..8aa528ac57 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -170,7 +170,6 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead + - " LOCAL_READ_TIME=" + metrics.localReadTime + " LOCAL_BYTES_READ=" + metrics.localBytesRead case None => "" } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 2ebb79989d..8f28ef49a8 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -228,7 +228,6 @@ final class ShuffleBlockFetcherIterator( * track in-memory are the ManagedBuffer references themselves. */ private[this] def fetchLocalBlocks() { - val startTime = System.currentTimeMillis val iter = localBlocks.iterator while (iter.hasNext) { val blockId = iter.next() @@ -246,7 +245,6 @@ final class ShuffleBlockFetcherIterator( return } } - shuffleMetrics.incLocalReadTime(System.currentTimeMillis - startTime) } private[this] def initialize(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 58d37e2d66..8e20864db5 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -294,7 +294,6 @@ private[spark] object JsonProtocol { ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~ ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~ ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~ - ("Local Read Time" -> shuffleReadMetrics.localReadTime) ~ ("Local Bytes Read" -> shuffleReadMetrics.localBytesRead) ~ ("Total Records Read" -> shuffleReadMetrics.recordsRead) } @@ -676,7 +675,6 @@ private[spark] object JsonProtocol { metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int]) metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long]) metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long]) - metrics.incLocalReadTime((json \ "Local Read Time").extractOpt[Long].getOrElse(0)) metrics.incLocalBytesRead((json \ "Local Bytes Read").extractOpt[Long].getOrElse(0)) metrics.incRecordsRead((json \ "Total Records Read").extractOpt[Long].getOrElse(0)) metrics diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index c181baf684..a2be724254 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -270,7 +270,6 @@ class JsonProtocolSuite extends FunSuite { .removeField { case (field, _) => field == "Local Read Time" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0) - assert(newMetrics.shuffleReadMetrics.get.localReadTime == 0) } test("SparkListenerApplicationStart backwards compatibility") { @@ -708,7 +707,6 @@ class JsonProtocolSuite extends FunSuite { sr.incFetchWaitTime(a + d) sr.incRemoteBlocksFetched(f) sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1) - sr.incLocalReadTime(a + e) sr.incLocalBytesRead(a + f) t.setShuffleReadMetrics(Some(sr)) } @@ -956,7 +954,6 @@ class JsonProtocolSuite extends FunSuite { | "Local Blocks Fetched": 700, | "Fetch Wait Time": 900, | "Remote Bytes Read": 1000, - | "Local Read Time": 1000, | "Local Bytes Read": 1100, | "Total Records Read" : 10 | }, |