aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-09-11 18:53:26 -0700
committerAndrew Or <andrewor14@gmail.com>2014-09-11 18:53:26 -0700
commit42904b8d013e71d03e301c3da62e33b4cc2eb54e (patch)
tree0da8792f140e3eeacea953daf263f7f38b8934cf
parent33c7a738ae9f2d12425afad6f08a4fe0b7a5c6ab (diff)
downloadspark-42904b8d013e71d03e301c3da62e33b4cc2eb54e.tar.gz
spark-42904b8d013e71d03e301c3da62e33b4cc2eb54e.tar.bz2
spark-42904b8d013e71d03e301c3da62e33b4cc2eb54e.zip
[SPARK-3465] fix task metrics aggregation in local mode
Before overwrite t.taskMetrics, take a deepcopy of it. Author: Davies Liu <davies.liu@gmail.com> Closes #2338 from davies/fix_metric and squashes the following commits: a5cdb63 [Davies Liu] Merge branch 'master' into fix_metric 7c879e0 [Davies Liu] add more comments 754b5b8 [Davies Liu] copy taskMetrics only when isLocal is true 5ca26dc [Davies Liu] fix task metrics aggregation in local mode
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala11
1 files changed, 10 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index dd903dc65d..acae448a9c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -360,7 +360,16 @@ private[spark] class Executor(
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
- tasksMetrics += ((taskRunner.taskId, metrics))
+ if (isLocal) {
+ // JobProgressListener will hold an reference of it during
+ // onExecutorMetricsUpdate(), then JobProgressListener can not see
+ // the changes of metrics any more, so make a deep copy of it
+ val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
+ tasksMetrics += ((taskRunner.taskId, copiedMetrics))
+ } else {
+ // It will be copied by serialization
+ tasksMetrics += ((taskRunner.taskId, metrics))
+ }
}
}
}