aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-01-29 18:03:04 -0800
committerAndrew Or <andrew@databricks.com>2016-01-29 18:03:08 -0800
commit12252d1da90fa7d2dffa3a7c249ecc8821dee130 (patch)
treeafac517a71e5639ba7796d55a3339167dd5a4f05 /sql
parent70e69fc4dd619654f5d24b8b84f6a94f7705c59b (diff)
downloadspark-12252d1da90fa7d2dffa3a7c249ecc8821dee130.tar.gz
spark-12252d1da90fa7d2dffa3a7c249ecc8821dee130.tar.bz2
spark-12252d1da90fa7d2dffa3a7c249ecc8821dee130.zip
[SPARK-13071] Coalescing HadoopRDD overwrites existing input metrics
This issue is causing tests to fail consistently in master with Hadoop 2.6 / 2.7. This is because for Hadoop 2.5+ we overwrite existing values of `InputMetrics#bytesRead` in each call to `HadoopRDD#compute`. In the case of coalesce, e.g. ``` sc.textFile(..., 4).coalesce(2).count() ``` we will call `compute` multiple times in the same task, overwriting `bytesRead` values from previous calls to `compute`. For a regression test, see `InputOutputMetricsSuite.input metrics for old hadoop with coalesce`. I did not add a new regression test because it's impossible without significant refactoring; there's a lot of existing duplicate code in this corner of Spark. This was caused by #10835. Author: Andrew Or <andrew@databricks.com> Closes #10973 from andrewor14/fix-input-metrics-coalesce.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala7
1 files changed, 6 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index edd87c2d8e..9703b16c86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -127,6 +127,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
val conf = getConf(isDriverSide = false)
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
+ val existingBytesRead = inputMetrics.bytesRead
// Sets the thread local variable for the file's name
split.serializableHadoopSplit.value match {
@@ -142,9 +143,13 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
case _ => None
}
+ // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+ // If we do a coalesce, however, we are likely to compute multiple partitions in the same
+ // task and in the same thread, in which case we need to avoid override values written by
+ // previous partitions (SPARK-13071).
def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
- inputMetrics.setBytesRead(getBytesRead())
+ inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}