aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-01-29 18:03:04 -0800
committerAndrew Or <andrew@databricks.com>2016-01-29 18:03:08 -0800
commit12252d1da90fa7d2dffa3a7c249ecc8821dee130 (patch)
treeafac517a71e5639ba7796d55a3339167dd5a4f05 /core
parent70e69fc4dd619654f5d24b8b84f6a94f7705c59b (diff)
downloadspark-12252d1da90fa7d2dffa3a7c249ecc8821dee130.tar.gz
spark-12252d1da90fa7d2dffa3a7c249ecc8821dee130.tar.bz2
spark-12252d1da90fa7d2dffa3a7c249ecc8821dee130.zip
[SPARK-13071] Coalescing HadoopRDD overwrites existing input metrics
This issue is causing tests to fail consistently in master with Hadoop 2.6 / 2.7. This is because for Hadoop 2.5+ we overwrite existing values of `InputMetrics#bytesRead` in each call to `HadoopRDD#compute`. In the case of coalesce, e.g. ``` sc.textFile(..., 4).coalesce(2).count() ``` we will call `compute` multiple times in the same task, overwriting `bytesRead` values from previous calls to `compute`. For a regression test, see `InputOutputMetricsSuite.input metrics for old hadoop with coalesce`. I did not add a new regression test because it's impossible without significant refactoring; there's a lot of existing duplicate code in this corner of Spark. This was caused by #10835. Author: Andrew Or <andrew@databricks.com> Closes #10973 from andrewor14/fix-input-metrics-coalesce.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala7
2 files changed, 12 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 3204e6adce..e2ebd7f00d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -215,6 +215,7 @@ class HadoopRDD[K, V](
// TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
+ val existingBytesRead = inputMetrics.bytesRead
// Sets the thread local variable for the file's name
split.inputSplit.value match {
@@ -230,9 +231,13 @@ class HadoopRDD[K, V](
case _ => None
}
+ // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+ // If we do a coalesce, however, we are likely to compute multiple partitions in the same
+ // task and in the same thread, in which case we need to avoid override values written by
+ // previous partitions (SPARK-13071).
def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
- inputMetrics.setBytesRead(getBytesRead())
+ inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 4d2816e335..e71d3405c0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -130,6 +130,7 @@ class NewHadoopRDD[K, V](
val conf = getConf
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
+ val existingBytesRead = inputMetrics.bytesRead
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
@@ -139,9 +140,13 @@ class NewHadoopRDD[K, V](
case _ => None
}
+ // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+ // If we do a coalesce, however, we are likely to compute multiple partitions in the same
+ // task and in the same thread, in which case we need to avoid override values written by
+ // previous partitions (SPARK-13071).
def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
- inputMetrics.setBytesRead(getBytesRead())
+ inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}