aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2015-02-09 21:22:09 -0800
committerAndrew Or <andrew@databricks.com>2015-02-09 21:22:09 -0800
commita2d33d0b01af87e931d9d883638a52d7a86f6248 (patch)
treef003c4f46d1e26a50aadaf0eae379bab1f9a46bc
parenta95ed52157473fb0e42e910ee15270e7f0edf943 (diff)
downloadspark-a2d33d0b01af87e931d9d883638a52d7a86f6248.tar.gz
spark-a2d33d0b01af87e931d9d883638a52d7a86f6248.tar.bz2
spark-a2d33d0b01af87e931d9d883638a52d7a86f6248.zip
[SPARK-5701] Only set ShuffleReadMetrics when task has shuffle deps
The updateShuffleReadMetrics method in TaskMetrics (called by the executor heartbeater) will currently always add a ShuffleReadMetrics to TaskMetrics (with values set to 0), even when the task didn't read any shuffle data. ShuffleReadMetrics should only be added if the task reads shuffle data. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #4488 from kayousterhout/SPARK-5701 and squashes the following commits: 673ed58 [Kay Ousterhout] SPARK-5701: Only set ShuffleReadMetrics when task has shuffle deps
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala28
2 files changed, 40 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index d05659193b..bf3f1e4fc7 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -177,8 +177,8 @@ class TaskMetrics extends Serializable {
* Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
* we can store all the different inputMetrics (one per readMethod).
*/
- private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod):
- InputMetrics =synchronized {
+ private[spark] def getInputMetricsForReadMethod(
+ readMethod: DataReadMethod): InputMetrics = synchronized {
_inputMetrics match {
case None =>
val metrics = new InputMetrics(readMethod)
@@ -195,15 +195,17 @@ class TaskMetrics extends Serializable {
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
private[spark] def updateShuffleReadMetrics(): Unit = synchronized {
- val merged = new ShuffleReadMetrics()
- for (depMetrics <- depsShuffleReadMetrics) {
- merged.incFetchWaitTime(depMetrics.fetchWaitTime)
- merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
- merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
- merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
- merged.incRecordsRead(depMetrics.recordsRead)
+ if (!depsShuffleReadMetrics.isEmpty) {
+ val merged = new ShuffleReadMetrics()
+ for (depMetrics <- depsShuffleReadMetrics) {
+ merged.incFetchWaitTime(depMetrics.fetchWaitTime)
+ merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
+ merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
+ merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
+ merged.incRecordsRead(depMetrics.recordsRead)
+ }
+ _shuffleReadMetrics = Some(merged)
}
- _shuffleReadMetrics = Some(merged)
}
private[spark] def updateInputMetrics(): Unit = synchronized {
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
new file mode 100644
index 0000000000..326e203afe
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.scalatest.FunSuite
+
+class TaskMetricsSuite extends FunSuite {
+ test("[SPARK-5701] updateShuffleReadMetrics: ShuffleReadMetrics not added when no shuffle deps") {
+ val taskMetrics = new TaskMetrics()
+ taskMetrics.updateShuffleReadMetrics()
+ assert(taskMetrics.shuffleReadMetrics.isEmpty)
+ }
+}