diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-04-21 01:06:22 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-21 01:06:22 -0700 |
commit | cb51680d2213ef3443d1c02930c1e76fe6eb2e31 (patch) | |
tree | c80396f74f3ea04681932ca6de23c7b77c60f3b8 /sql | |
parent | 228128ce2571c3058cb13ae502a5328745eeeef5 (diff) | |
download | spark-cb51680d2213ef3443d1c02930c1e76fe6eb2e31.tar.gz spark-cb51680d2213ef3443d1c02930c1e76fe6eb2e31.tar.bz2 spark-cb51680d2213ef3443d1c02930c1e76fe6eb2e31.zip |
[SPARK-14753][CORE] remove internal flag in Accumulable
## What changes were proposed in this pull request?
the `Accumulable.internal` flag is only used to avoid registering internal accumulators for 2 certain cases:
1. `TaskMetrics.createTempShuffleReadMetrics`: the accumulators in the temp shuffle read metrics should not be registered.
2. `TaskMetrics.fromAccumulatorUpdates`: the created task metrics is only used to post event, accumulators inside it should not be registered.
For 1, we can create a `TempShuffleReadMetrics` that don't create accumulators, just keep the data and merge it at last.
For 2, we can un-register these accumulators immediately.
TODO: remove `internal` flag in `AccumulableInfo` with followup PR
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #12525 from cloud-fan/acc.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala | 5 |
1 files changed, 2 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 7fa1390729..92c31eac95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -29,12 +29,11 @@ import org.apache.spark.util.Utils */ private[sql] abstract class SQLMetric[R <: SQLMetricValue[T], T]( name: String, - val param: SQLMetricParam[R, T]) - extends Accumulable[R, T](param.zero, param, Some(name), internal = true) { + val param: SQLMetricParam[R, T]) extends Accumulable[R, T](param.zero, param, Some(name)) { // Provide special identifier as metadata so we can tell that this is a `SQLMetric` later override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { - new AccumulableInfo(id, Some(name), update, value, isInternal, countFailedValues, + new AccumulableInfo(id, Some(name), update, value, true, countFailedValues, Some(SQLMetrics.ACCUM_IDENTIFIER)) } |