aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala97
6 files changed, 120 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 57f9faf5dd..211e3ede53 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -133,10 +133,9 @@ class SparkHadoopUtil extends Logging {
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
*/
- private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
- : Option[() => Long] = {
+ private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = {
try {
- val threadStats = getFileSystemThreadStatistics(path, conf)
+ val threadStats = getFileSystemThreadStatistics()
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesRead = f()
@@ -156,10 +155,9 @@ class SparkHadoopUtil extends Logging {
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
*/
- private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
- : Option[() => Long] = {
+ private[spark] def getFSBytesWrittenOnThreadCallback(): Option[() => Long] = {
try {
- val threadStats = getFileSystemThreadStatistics(path, conf)
+ val threadStats = getFileSystemThreadStatistics()
val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesWritten = f()
@@ -172,10 +170,8 @@ class SparkHadoopUtil extends Logging {
}
}
- private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
- val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
- val scheme = qualifiedPath.toUri().getScheme()
- val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
+ private def getFileSystemThreadStatistics(): Seq[AnyRef] = {
+ val stats = FileSystem.getAllStatistics()
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index ddb5903bf6..97912c68c5 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -19,7 +19,6 @@ package org.apache.spark.executor
import java.util.concurrent.atomic.AtomicLong
-import org.apache.spark.executor.DataReadMethod
import org.apache.spark.executor.DataReadMethod.DataReadMethod
import scala.collection.mutable.ArrayBuffer
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 056aef0bc2..c3e3931042 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.mapred.JobID
import org.apache.hadoop.mapred.TaskAttemptID
import org.apache.hadoop.mapred.TaskID
+import org.apache.hadoop.mapred.lib.CombineFileSplit
import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark._
@@ -218,13 +219,13 @@ class HadoopRDD[K, V](
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
+ val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
split.inputSplit.value match {
- case split: FileSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
+ case _: FileSplit | _: CombineFileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}
- )
+ }
inputMetrics.setBytesReadCallback(bytesReadCallback)
var reader: RecordReader[K, V] = null
@@ -254,7 +255,8 @@ class HadoopRDD[K, V](
reader.close()
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
- } else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
+ } else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
+ split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 7b0e3c87cc..d86f95ac3e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -25,7 +25,7 @@ import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.input.WholeTextFileInputFormat
@@ -34,7 +34,7 @@ import org.apache.spark.Logging
import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}
-import org.apache.spark.executor.{DataReadMethod, InputMetrics}
+import org.apache.spark.executor.DataReadMethod
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.Utils
@@ -114,13 +114,13 @@ class NewHadoopRDD[K, V](
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
+ val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
split.serializableHadoopSplit.value match {
- case split: FileSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf)
+ case _: FileSplit | _: CombineFileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}
- )
+ }
inputMetrics.setBytesReadCallback(bytesReadCallback)
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
@@ -163,7 +163,8 @@ class NewHadoopRDD[K, V](
reader.close()
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
- } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
+ } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+ split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 0f37d830ef..49b88a90ab 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -990,7 +990,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
- val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
+ val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
try {
@@ -1061,7 +1061,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
- val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
+ val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
@@ -1086,11 +1086,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.commitJob()
}
- private def initHadoopOutputMetrics(context: TaskContext, config: Configuration)
- : (OutputMetrics, Option[() => Long]) = {
- val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
- .map(new Path(_))
- .flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
+ private def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, Option[() => Long]) = {
+ val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
if (bytesWrittenCallback.isDefined) {
context.taskMetrics.outputMetrics = Some(outputMetrics)
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index 10a39990f8..81db66ae17 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -26,7 +26,16 @@ import org.scalatest.FunSuite
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
+import org.apache.hadoop.mapred.{FileSplit => OldFileSplit, InputSplit => OldInputSplit, JobConf,
+ LineRecordReader => OldLineRecordReader, RecordReader => OldRecordReader, Reporter,
+ TextInputFormat => OldTextInputFormat}
+import org.apache.hadoop.mapred.lib.{CombineFileInputFormat => OldCombineFileInputFormat,
+ CombineFileSplit => OldCombineFileSplit, CombineFileRecordReader => OldCombineFileRecordReader}
+import org.apache.hadoop.mapreduce.{InputSplit => NewInputSplit, RecordReader => NewRecordReader,
+ TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat => NewCombineFileInputFormat,
+ CombineFileRecordReader => NewCombineFileRecordReader, CombineFileSplit => NewCombineFileSplit,
+ FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
import org.apache.spark.SharedSparkContext
import org.apache.spark.deploy.SparkHadoopUtil
@@ -202,7 +211,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
val fs = FileSystem.getLocal(new Configuration())
val outPath = new Path(fs.getWorkingDirectory, "outdir")
- if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, fs.getConf).isDefined) {
+ if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
val taskBytesWritten = new ArrayBuffer[Long]()
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
@@ -225,4 +234,88 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
}
}
}
+
+ test("input metrics with old CombineFileInputFormat") {
+ val bytesRead = runAndReturnBytesRead {
+ sc.hadoopFile(tmpFilePath, classOf[OldCombineTextInputFormat], classOf[LongWritable],
+ classOf[Text], 2).count()
+ }
+ assert(bytesRead >= tmpFile.length())
+ }
+
+ test("input metrics with new CombineFileInputFormat") {
+ val bytesRead = runAndReturnBytesRead {
+ sc.newAPIHadoopFile(tmpFilePath, classOf[NewCombineTextInputFormat], classOf[LongWritable],
+ classOf[Text], new Configuration()).count()
+ }
+ assert(bytesRead >= tmpFile.length())
+ }
+}
+
+/**
+ * Hadoop 2 has a version of this, but we can't use it for backwards compatibility
+ */
+class OldCombineTextInputFormat extends OldCombineFileInputFormat[LongWritable, Text] {
+ override def getRecordReader(split: OldInputSplit, conf: JobConf, reporter: Reporter)
+ : OldRecordReader[LongWritable, Text] = {
+ new OldCombineFileRecordReader[LongWritable, Text](conf,
+ split.asInstanceOf[OldCombineFileSplit], reporter, classOf[OldCombineTextRecordReaderWrapper]
+ .asInstanceOf[Class[OldRecordReader[LongWritable, Text]]])
+ }
+}
+
+class OldCombineTextRecordReaderWrapper(
+ split: OldCombineFileSplit,
+ conf: Configuration,
+ reporter: Reporter,
+ idx: Integer) extends OldRecordReader[LongWritable, Text] {
+
+ val fileSplit = new OldFileSplit(split.getPath(idx),
+ split.getOffset(idx),
+ split.getLength(idx),
+ split.getLocations())
+
+ val delegate: OldLineRecordReader = new OldTextInputFormat().getRecordReader(fileSplit,
+ conf.asInstanceOf[JobConf], reporter).asInstanceOf[OldLineRecordReader]
+
+ override def next(key: LongWritable, value: Text): Boolean = delegate.next(key, value)
+ override def createKey(): LongWritable = delegate.createKey()
+ override def createValue(): Text = delegate.createValue()
+ override def getPos(): Long = delegate.getPos
+ override def close(): Unit = delegate.close()
+ override def getProgress(): Float = delegate.getProgress
+}
+
+/**
+ * Hadoop 2 has a version of this, but we can't use it for backwards compatibility
+ */
+class NewCombineTextInputFormat extends NewCombineFileInputFormat[LongWritable,Text] {
+ def createRecordReader(split: NewInputSplit, context: TaskAttemptContext)
+ : NewRecordReader[LongWritable, Text] = {
+ new NewCombineFileRecordReader[LongWritable,Text](split.asInstanceOf[NewCombineFileSplit],
+ context, classOf[NewCombineTextRecordReaderWrapper])
+ }
}
+
+class NewCombineTextRecordReaderWrapper(
+ split: NewCombineFileSplit,
+ context: TaskAttemptContext,
+ idx: Integer) extends NewRecordReader[LongWritable, Text] {
+
+ val fileSplit = new NewFileSplit(split.getPath(idx),
+ split.getOffset(idx),
+ split.getLength(idx),
+ split.getLocations())
+
+ val delegate = new NewTextInputFormat().createRecordReader(fileSplit, context)
+
+ override def initialize(split: NewInputSplit, context: TaskAttemptContext): Unit = {
+ delegate.initialize(fileSplit, context)
+ }
+
+ override def nextKeyValue(): Boolean = delegate.nextKeyValue()
+ override def getCurrentKey(): LongWritable = delegate.getCurrentKey
+ override def getCurrentValue(): Text = delegate.getCurrentValue
+ override def getProgress(): Float = delegate.getProgress
+ override def close(): Unit = delegate.close()
+} \ No newline at end of file