diff options
author | Andrew Or <andrew@databricks.com> | 2016-01-19 10:58:51 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-01-19 10:58:51 -0800 |
commit | b122c861cd72b580334a7532f0a52c0439552bdf (patch) | |
tree | 9a3c7e9f3b1c190c7914c89e061c7fa15911a846 /sql/core | |
parent | e14817b528ccab4b4685b45a95e2325630b5fc53 (diff) | |
download | spark-b122c861cd72b580334a7532f0a52c0439552bdf.tar.gz spark-b122c861cd72b580334a7532f0a52c0439552bdf.tar.bz2 spark-b122c861cd72b580334a7532f0a52c0439552bdf.zip |
[SPARK-12887] Do not expose var's in TaskMetrics
This is a step in implementing SPARK-10620, which migrates TaskMetrics to accumulators.
TaskMetrics has a bunch of var's, some are fully public, some are `private[spark]`. This is bad coding style that makes it easy to accidentally overwrite previously set metrics. This has happened a few times in the past and caused bugs that were difficult to debug.
Instead, we should have get-or-create semantics, which are more readily understandable. This makes sense in the case of TaskMetrics because these are just aggregated metrics that we want to collect throughout the task, so it doesn't matter who's incrementing them.
Parent PR: #10717
Author: Andrew Or <andrew@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Author: andrewor14 <andrew@databricks.com>
Closes #10815 from andrewor14/get-or-create-metrics.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala | 3 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala | 1 |
2 files changed, 1 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala index d45d2db62f..8222b84d33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala @@ -126,8 +126,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( logInfo("Input split: " + split.serializableHadoopSplit) val conf = getConf(isDriverSide = false) - val inputMetrics = context.taskMetrics - .getInputMetricsForReadMethod(DataReadMethod.Hadoop) + val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) // Sets the thread local variable for the file's name split.serializableHadoopSplit.value match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 9f09eb4429..7438e11ef7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -127,7 +127,6 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { assert(sorter.numSpills > 0) // Merging spilled files should not throw assertion error - taskContext.taskMetrics.shuffleWriteMetrics = Some(new ShuffleWriteMetrics) sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), outputFile) } { // Clean up |