aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-11-03 15:19:01 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-03 15:19:01 -0800
commit28128150e7e0c2b7d1c483e67214bdaef59f7d75 (patch)
tree0140ea27eb62584f190ce939549c003b231bd2ce /core/src
parent25bef7e6951301e93004567fc0cef96bf8d1a224 (diff)
downloadspark-28128150e7e0c2b7d1c483e67214bdaef59f7d75.tar.gz
spark-28128150e7e0c2b7d1c483e67214bdaef59f7d75.tar.bz2
spark-28128150e7e0c2b7d1c483e67214bdaef59f7d75.zip
SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader insta...
...ntiation Author: Sandy Ryza <sandy@cloudera.com> Closes #3045 from sryza/sandy-spark-4178 and squashes the following commits: 8d2e70e [Sandy Ryza] Kostas's review feedback e5b27c0 [Sandy Ryza] SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader instantiation
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala27
3 files changed, 53 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 946fb5616d..a157e36e22 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -211,20 +211,11 @@ class HadoopRDD[K, V](
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
- var reader: RecordReader[K, V] = null
val jobConf = getJobConf()
- val inputFormat = getInputFormat(jobConf)
- HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
- context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
- reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
-
- // Register an on-task-completion callback to close the input stream.
- context.addTaskCompletionListener{ context => closeIfNeeded() }
- val key: K = reader.createKey()
- val value: V = reader.createValue()
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
- // Find a function that will return the FileSystem bytes read by this thread.
+ // Find a function that will return the FileSystem bytes read by this thread. Do this before
+ // creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
@@ -234,6 +225,18 @@ class HadoopRDD[K, V](
if (bytesReadCallback.isDefined) {
context.taskMetrics.inputMetrics = Some(inputMetrics)
}
+
+ var reader: RecordReader[K, V] = null
+ val inputFormat = getInputFormat(jobConf)
+ HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
+ context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
+ reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
+
+ // Register an on-task-completion callback to close the input stream.
+ context.addTaskCompletionListener{ context => closeIfNeeded() }
+ val key: K = reader.createKey()
+ val value: V = reader.createValue()
+
var recordsSinceMetricsUpdate = 0
override def getNext() = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 6d6b86721c..351e145f96 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -107,20 +107,10 @@ class NewHadoopRDD[K, V](
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
- val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
- val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
- val format = inputFormatClass.newInstance
- format match {
- case configurable: Configurable =>
- configurable.setConf(conf)
- case _ =>
- }
- val reader = format.createRecordReader(
- split.serializableHadoopSplit.value, hadoopAttemptContext)
- reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
- // Find a function that will return the FileSystem bytes read by this thread.
+ // Find a function that will return the FileSystem bytes read by this thread. Do this before
+ // creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
@@ -131,6 +121,18 @@ class NewHadoopRDD[K, V](
context.taskMetrics.inputMetrics = Some(inputMetrics)
}
+ val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
+ val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
+ val format = inputFormatClass.newInstance
+ format match {
+ case configurable: Configurable =>
+ configurable.setConf(conf)
+ case _ =>
+ }
+ val reader = format.createRecordReader(
+ split.serializableHadoopSplit.value, hadoopAttemptContext)
+ reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
var havePair = false
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
index 33bd1afea2..48c386ba04 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
@@ -27,7 +27,7 @@ import scala.collection.mutable.ArrayBuffer
import java.io.{FileWriter, PrintWriter, File}
class InputMetricsSuite extends FunSuite with SharedSparkContext {
- test("input metrics when reading text file") {
+ test("input metrics when reading text file with single split") {
val file = new File(getClass.getSimpleName + ".txt")
val pw = new PrintWriter(new FileWriter(file))
pw.println("some stuff")
@@ -48,6 +48,29 @@ class InputMetricsSuite extends FunSuite with SharedSparkContext {
// Wait for task end events to come in
sc.listenerBus.waitUntilEmpty(500)
assert(taskBytesRead.length == 2)
- assert(taskBytesRead.sum == file.length())
+ assert(taskBytesRead.sum >= file.length())
+ }
+
+ test("input metrics when reading text file with multiple splits") {
+ val file = new File(getClass.getSimpleName + ".txt")
+ val pw = new PrintWriter(new FileWriter(file))
+ for (i <- 0 until 10000) {
+ pw.println("some stuff")
+ }
+ pw.close()
+ file.deleteOnExit()
+
+ val taskBytesRead = new ArrayBuffer[Long]()
+ sc.addSparkListener(new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
+ }
+ })
+ sc.textFile("file://" + file.getAbsolutePath, 2).count()
+
+ // Wait for task end events to come in
+ sc.listenerBus.waitUntilEmpty(500)
+ assert(taskBytesRead.length == 2)
+ assert(taskBytesRead.sum >= file.length())
}
}