aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-09-20 16:03:17 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-20 16:03:17 -0700
commit7c8ad1c0838762f5b632f683834c88a711aef4dd (patch)
treefbbdf3ea9651d18187e37a66f3f831bbaac61a18 /core
parent5f8833c672ab64aa5886a8239ae2ff2a8ea42363 (diff)
downloadspark-7c8ad1c0838762f5b632f683834c88a711aef4dd.tar.gz
spark-7c8ad1c0838762f5b632f683834c88a711aef4dd.tar.bz2
spark-7c8ad1c0838762f5b632f683834c88a711aef4dd.zip
SPARK-3574. Shuffle finish time always reported as -1
The included test waits 100 ms after job completion for task completion events to come in so it can verify they have reasonable finish times. Does anyone know a better way to wait on listener events that are expected to come in? Author: Sandy Ryza <sandy@cloudera.com> Closes #2440 from sryza/sandy-spark-3574 and squashes the following commits: c81439b [Sandy Ryza] Fix test failure b340956 [Sandy Ryza] SPARK-3574. Remove shuffleFinishTime metric
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala3
4 files changed, 0 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 99a88c1345..3e49b6235a 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -137,7 +137,6 @@ class TaskMetrics extends Serializable {
merged.localBlocksFetched += depMetrics.localBlocksFetched
merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
merged.remoteBytesRead += depMetrics.remoteBytesRead
- merged.shuffleFinishTime = math.max(merged.shuffleFinishTime, depMetrics.shuffleFinishTime)
}
_shuffleReadMetrics = Some(merged)
}
@@ -178,11 +177,6 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
@DeveloperApi
class ShuffleReadMetrics extends Serializable {
/**
- * Absolute time when this task finished reading shuffle data
- */
- var shuffleFinishTime: Long = -1
-
- /**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 4d6b5c8188..ceb434feb6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -171,7 +171,6 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
}
val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
case Some(metrics) =>
- " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c4dddb2d10..6a48f673c4 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -255,7 +255,6 @@ private[spark] object JsonProtocol {
}
def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = {
- ("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~
("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
@@ -590,7 +589,6 @@ private[spark] object JsonProtocol {
def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
val metrics = new ShuffleReadMetrics
- metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long]
metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 2b45d8b695..f1f88c5fd3 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -356,7 +356,6 @@ class JsonProtocolSuite extends FunSuite {
}
private def assertEquals(metrics1: ShuffleReadMetrics, metrics2: ShuffleReadMetrics) {
- assert(metrics1.shuffleFinishTime === metrics2.shuffleFinishTime)
assert(metrics1.remoteBlocksFetched === metrics2.remoteBlocksFetched)
assert(metrics1.localBlocksFetched === metrics2.localBlocksFetched)
assert(metrics1.fetchWaitTime === metrics2.fetchWaitTime)
@@ -568,7 +567,6 @@ class JsonProtocolSuite extends FunSuite {
t.inputMetrics = Some(inputMetrics)
} else {
val sr = new ShuffleReadMetrics
- sr.shuffleFinishTime = b + c
sr.remoteBytesRead = b + d
sr.localBlocksFetched = e
sr.fetchWaitTime = a + d
@@ -806,7 +804,6 @@ class JsonProtocolSuite extends FunSuite {
| "Memory Bytes Spilled": 800,
| "Disk Bytes Spilled": 0,
| "Shuffle Read Metrics": {
- | "Shuffle Finish Time": 900,
| "Remote Blocks Fetched": 800,
| "Local Blocks Fetched": 700,
| "Fetch Wait Time": 900,