aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala7
2 files changed, 12 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 3204e6adce..e2ebd7f00d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -215,6 +215,7 @@ class HadoopRDD[K, V](
// TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
+ val existingBytesRead = inputMetrics.bytesRead
// Sets the thread local variable for the file's name
split.inputSplit.value match {
@@ -230,9 +231,13 @@ class HadoopRDD[K, V](
case _ => None
}
+ // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+ // If we do a coalesce, however, we are likely to compute multiple partitions in the same
+ // task and in the same thread, in which case we need to avoid override values written by
+ // previous partitions (SPARK-13071).
def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
- inputMetrics.setBytesRead(getBytesRead())
+ inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 4d2816e335..e71d3405c0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -130,6 +130,7 @@ class NewHadoopRDD[K, V](
val conf = getConf
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
+ val existingBytesRead = inputMetrics.bytesRead
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
@@ -139,9 +140,13 @@ class NewHadoopRDD[K, V](
case _ => None
}
+ // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+ // If we do a coalesce, however, we are likely to compute multiple partitions in the same
+ // task and in the same thread, in which case we need to avoid override values written by
+ // previous partitions (SPARK-13071).
def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
- inputMetrics.setBytesRead(getBytesRead())
+ inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}