diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2015-02-09 21:22:09 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-02-09 21:22:09 -0800 |
commit | a2d33d0b01af87e931d9d883638a52d7a86f6248 (patch) | |
tree | f003c4f46d1e26a50aadaf0eae379bab1f9a46bc | |
parent | a95ed52157473fb0e42e910ee15270e7f0edf943 (diff) | |
download | spark-a2d33d0b01af87e931d9d883638a52d7a86f6248.tar.gz spark-a2d33d0b01af87e931d9d883638a52d7a86f6248.tar.bz2 spark-a2d33d0b01af87e931d9d883638a52d7a86f6248.zip |
[SPARK-5701] Only set ShuffleReadMetrics when task has shuffle deps
The updateShuffleReadMetrics method in TaskMetrics (called by the executor heartbeater) will currently always add a ShuffleReadMetrics to TaskMetrics (with values set to 0), even when the task didn't read any shuffle data. ShuffleReadMetrics should only be added if the task reads shuffle data.
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes #4488 from kayousterhout/SPARK-5701 and squashes the following commits:
673ed58 [Kay Ousterhout] SPARK-5701: Only set ShuffleReadMetrics when task has shuffle deps
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala | 22 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala | 28 |
2 files changed, 40 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index d05659193b..bf3f1e4fc7 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -177,8 +177,8 @@ class TaskMetrics extends Serializable { * Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed, * we can store all the different inputMetrics (one per readMethod). */ - private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod): - InputMetrics =synchronized { + private[spark] def getInputMetricsForReadMethod( + readMethod: DataReadMethod): InputMetrics = synchronized { _inputMetrics match { case None => val metrics = new InputMetrics(readMethod) @@ -195,15 +195,17 @@ class TaskMetrics extends Serializable { * Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics. */ private[spark] def updateShuffleReadMetrics(): Unit = synchronized { - val merged = new ShuffleReadMetrics() - for (depMetrics <- depsShuffleReadMetrics) { - merged.incFetchWaitTime(depMetrics.fetchWaitTime) - merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) - merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) - merged.incRemoteBytesRead(depMetrics.remoteBytesRead) - merged.incRecordsRead(depMetrics.recordsRead) + if (!depsShuffleReadMetrics.isEmpty) { + val merged = new ShuffleReadMetrics() + for (depMetrics <- depsShuffleReadMetrics) { + merged.incFetchWaitTime(depMetrics.fetchWaitTime) + merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) + merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) + merged.incRemoteBytesRead(depMetrics.remoteBytesRead) + merged.incRecordsRead(depMetrics.recordsRead) + } + _shuffleReadMetrics = Some(merged) } - _shuffleReadMetrics = Some(merged) } private[spark] def updateInputMetrics(): Unit = synchronized { diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala new file mode 100644 index 0000000000..326e203afe --- /dev/null +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.scalatest.FunSuite + +class TaskMetricsSuite extends FunSuite { + test("[SPARK-5701] updateShuffleReadMetrics: ShuffleReadMetrics not added when no shuffle deps") { + val taskMetrics = new TaskMetrics() + taskMetrics.updateShuffleReadMetrics() + assert(taskMetrics.shuffleReadMetrics.isEmpty) + } +} |