diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-09-11 18:53:26 -0700 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-09-11 18:53:26 -0700 |
commit | 42904b8d013e71d03e301c3da62e33b4cc2eb54e (patch) | |
tree | 0da8792f140e3eeacea953daf263f7f38b8934cf /core/src/main | |
parent | 33c7a738ae9f2d12425afad6f08a4fe0b7a5c6ab (diff) | |
download | spark-42904b8d013e71d03e301c3da62e33b4cc2eb54e.tar.gz spark-42904b8d013e71d03e301c3da62e33b4cc2eb54e.tar.bz2 spark-42904b8d013e71d03e301c3da62e33b4cc2eb54e.zip |
[SPARK-3465] fix task metrics aggregation in local mode
Before overwrite t.taskMetrics, take a deepcopy of it.
Author: Davies Liu <davies.liu@gmail.com>
Closes #2338 from davies/fix_metric and squashes the following commits:
a5cdb63 [Davies Liu] Merge branch 'master' into fix_metric
7c879e0 [Davies Liu] add more comments
754b5b8 [Davies Liu] copy taskMetrics only when isLocal is true
5ca26dc [Davies Liu] fix task metrics aggregation in local mode
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/Executor.scala | 11 |
1 files changed, 10 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index dd903dc65d..acae448a9c 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -360,7 +360,16 @@ private[spark] class Executor( if (!taskRunner.attemptedTask.isEmpty) { Option(taskRunner.task).flatMap(_.metrics).foreach { metrics => metrics.updateShuffleReadMetrics - tasksMetrics += ((taskRunner.taskId, metrics)) + if (isLocal) { + // JobProgressListener will hold an reference of it during + // onExecutorMetricsUpdate(), then JobProgressListener can not see + // the changes of metrics any more, so make a deep copy of it + val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics)) + tasksMetrics += ((taskRunner.taskId, copiedMetrics)) + } else { + // It will be copied by serialization + tasksMetrics += ((taskRunner.taskId, metrics)) + } } } } |