diff options
author | Andrew Or <andrew@databricks.com> | 2016-01-29 18:03:04 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-01-29 18:03:08 -0800 |
commit | 12252d1da90fa7d2dffa3a7c249ecc8821dee130 (patch) | |
tree | afac517a71e5639ba7796d55a3339167dd5a4f05 /sql | |
parent | 70e69fc4dd619654f5d24b8b84f6a94f7705c59b (diff) | |
download | spark-12252d1da90fa7d2dffa3a7c249ecc8821dee130.tar.gz spark-12252d1da90fa7d2dffa3a7c249ecc8821dee130.tar.bz2 spark-12252d1da90fa7d2dffa3a7c249ecc8821dee130.zip |
[SPARK-13071] Coalescing HadoopRDD overwrites existing input metrics
This issue is causing tests to fail consistently in master with Hadoop 2.6 / 2.7. This is because for Hadoop 2.5+ we overwrite existing values of `InputMetrics#bytesRead` in each call to `HadoopRDD#compute`. In the case of coalesce, e.g.
```
sc.textFile(..., 4).coalesce(2).count()
```
we will call `compute` multiple times in the same task, overwriting `bytesRead` values from previous calls to `compute`.
For a regression test, see `InputOutputMetricsSuite.input metrics for old hadoop with coalesce`. I did not add a new regression test because it's impossible without significant refactoring; there's a lot of existing duplicate code in this corner of Spark.
This was caused by #10835.
Author: Andrew Or <andrew@databricks.com>
Closes #10973 from andrewor14/fix-input-metrics-coalesce.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala index edd87c2d8e..9703b16c86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala @@ -127,6 +127,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( val conf = getConf(isDriverSide = false) val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) + val existingBytesRead = inputMetrics.bytesRead // Sets the thread local variable for the file's name split.serializableHadoopSplit.value match { @@ -142,9 +143,13 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( case _ => None } + // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). def updateBytesRead(): Unit = { getBytesReadCallback.foreach { getBytesRead => - inputMetrics.setBytesRead(getBytesRead()) + inputMetrics.setBytesRead(existingBytesRead + getBytesRead()) } } |