aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2015-02-25 14:55:24 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2015-02-25 14:55:24 -0800
commit838a48036c050cef03b8c3620e16b5495cd7beab (patch)
tree9525375a6711283a1c4b302360b777fa11296463
parent9f603fce78fcc997926e9a72dec44d48cbc396fc (diff)
downloadspark-838a48036c050cef03b8c3620e16b5495cd7beab.tar.gz
spark-838a48036c050cef03b8c3620e16b5495cd7beab.tar.bz2
spark-838a48036c050cef03b8c3620e16b5495cd7beab.zip
[SPARK-5982] Remove incorrect Local Read Time Metric
This metric is incomplete, because the files are memory mapped, so much of the read from disk occurs later as tasks actually read the file's data. This should be merged into 1.3, so that we never expose this incorrect metric to users. CC pwendell ksakellis sryza Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #4749 from kayousterhout/SPARK-5982 and squashes the following commits: 9737b5e [Kay Ousterhout] More fixes a1eb300 [Kay Ousterhout] Removed one more use of local read time cf13497 [Kay Ousterhout] [SPARK-5982] Remove incorrectwq Local Read Time Metric
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala3
5 files changed, 0 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index df36566bec..07b152651d 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -203,7 +203,6 @@ class TaskMetrics extends Serializable {
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
merged.incLocalBytesRead(depMetrics.localBytesRead)
- merged.incLocalReadTime(depMetrics.localReadTime)
merged.incRecordsRead(depMetrics.recordsRead)
}
_shuffleReadMetrics = Some(merged)
@@ -346,13 +345,6 @@ class ShuffleReadMetrics extends Serializable {
private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
/**
- * Time the task spent (in milliseconds) reading local shuffle blocks (from the local disk).
- */
- private var _localReadTime: Long = _
- def localReadTime = _localReadTime
- private[spark] def incLocalReadTime(value: Long) = _localReadTime += value
-
- /**
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
private var _localBytesRead: Long = _
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index f9fc8aa304..8aa528ac57 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -170,7 +170,6 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
" REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
" REMOTE_BYTES_READ=" + metrics.remoteBytesRead +
- " LOCAL_READ_TIME=" + metrics.localReadTime +
" LOCAL_BYTES_READ=" + metrics.localBytesRead
case None => ""
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 2ebb79989d..8f28ef49a8 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -228,7 +228,6 @@ final class ShuffleBlockFetcherIterator(
* track in-memory are the ManagedBuffer references themselves.
*/
private[this] def fetchLocalBlocks() {
- val startTime = System.currentTimeMillis
val iter = localBlocks.iterator
while (iter.hasNext) {
val blockId = iter.next()
@@ -246,7 +245,6 @@ final class ShuffleBlockFetcherIterator(
return
}
}
- shuffleMetrics.incLocalReadTime(System.currentTimeMillis - startTime)
}
private[this] def initialize(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 58d37e2d66..8e20864db5 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -294,7 +294,6 @@ private[spark] object JsonProtocol {
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~
- ("Local Read Time" -> shuffleReadMetrics.localReadTime) ~
("Local Bytes Read" -> shuffleReadMetrics.localBytesRead) ~
("Total Records Read" -> shuffleReadMetrics.recordsRead)
}
@@ -676,7 +675,6 @@ private[spark] object JsonProtocol {
metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
- metrics.incLocalReadTime((json \ "Local Read Time").extractOpt[Long].getOrElse(0))
metrics.incLocalBytesRead((json \ "Local Bytes Read").extractOpt[Long].getOrElse(0))
metrics.incRecordsRead((json \ "Total Records Read").extractOpt[Long].getOrElse(0))
metrics
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index c181baf684..a2be724254 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -270,7 +270,6 @@ class JsonProtocolSuite extends FunSuite {
.removeField { case (field, _) => field == "Local Read Time" }
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0)
- assert(newMetrics.shuffleReadMetrics.get.localReadTime == 0)
}
test("SparkListenerApplicationStart backwards compatibility") {
@@ -708,7 +707,6 @@ class JsonProtocolSuite extends FunSuite {
sr.incFetchWaitTime(a + d)
sr.incRemoteBlocksFetched(f)
sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1)
- sr.incLocalReadTime(a + e)
sr.incLocalBytesRead(a + f)
t.setShuffleReadMetrics(Some(sr))
}
@@ -956,7 +954,6 @@ class JsonProtocolSuite extends FunSuite {
| "Local Blocks Fetched": 700,
| "Fetch Wait Time": 900,
| "Remote Bytes Read": 1000,
- | "Local Read Time": 1000,
| "Local Bytes Read": 1100,
| "Total Records Read" : 10
| },