diff options
Diffstat (limited to 'core/src/main')
5 files changed, 48 insertions, 146 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 23156072c3..941e2d13fb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy import java.io.IOException -import java.lang.reflect.Method import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -29,7 +28,6 @@ import scala.util.control.NonFatal import com.google.common.primitives.Longs import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} -import org.apache.hadoop.fs.FileSystem.Statistics import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.{Token, TokenIdentifier} @@ -140,54 +138,29 @@ class SparkHadoopUtil extends Logging { /** * Returns a function that can be called to find Hadoop FileSystem bytes read. If * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will - * return the bytes read on r since t. Reflection is required because thread-level FileSystem - * statistics are only available as of Hadoop 2.5 (see HADOOP-10688). - * Returns None if the required method can't be found. + * return the bytes read on r since t. + * + * @return None if the required method can't be found. */ - private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = { - try { - val threadStats = getFileSystemThreadStatistics() - val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead") - val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum - val baselineBytesRead = f() - Some(() => f() - baselineBytesRead) - } catch { - case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => - logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e) - None - } + private[spark] def getFSBytesReadOnThreadCallback(): () => Long = { + val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics) + val f = () => threadStats.map(_.getBytesRead).sum + val baselineBytesRead = f() + () => f() - baselineBytesRead } /** * Returns a function that can be called to find Hadoop FileSystem bytes written. If * getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will - * return the bytes written on r since t. Reflection is required because thread-level FileSystem - * statistics are only available as of Hadoop 2.5 (see HADOOP-10688). - * Returns None if the required method can't be found. + * return the bytes written on r since t. + * + * @return None if the required method can't be found. */ - private[spark] def getFSBytesWrittenOnThreadCallback(): Option[() => Long] = { - try { - val threadStats = getFileSystemThreadStatistics() - val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten") - val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum - val baselineBytesWritten = f() - Some(() => f() - baselineBytesWritten) - } catch { - case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => - logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e) - None - } - } - - private def getFileSystemThreadStatistics(): Seq[AnyRef] = { - FileSystem.getAllStatistics.asScala.map( - Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) - } - - private def getFileSystemThreadStatisticsMethod(methodName: String): Method = { - val statisticsDataClass = - Utils.classForName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData") - statisticsDataClass.getDeclaredMethod(methodName) + private[spark] def getFSBytesWrittenOnThreadCallback(): () => Long = { + val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics) + val f = () => threadStats.map(_.getBytesWritten).sum + val baselineBytesWritten = f() + () => f() - baselineBytesWritten } /** diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index 63918ef12a..1e0a1e605c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -125,8 +125,7 @@ object SparkHadoopMapReduceWriter extends Logging { val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId) committer.setupTask(taskContext) - val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = - SparkHadoopWriterUtils.initHadoopOutputMetrics(context) + val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context) // Initiate the writer. val taskFormat = outputFormat.newInstance() @@ -149,8 +148,7 @@ object SparkHadoopMapReduceWriter extends Logging { writer.write(pair._1, pair._2) // Update bytes written metric every few records - SparkHadoopWriterUtils.maybeUpdateOutputMetrics( - outputMetricsAndBytesWrittenCallback, recordsWritten) + SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten) recordsWritten += 1 } if (writer != null) { @@ -171,11 +169,8 @@ object SparkHadoopMapReduceWriter extends Logging { } }) - outputMetricsAndBytesWrittenCallback.foreach { - case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) - } + outputMetrics.setBytesWritten(callback()) + outputMetrics.setRecordsWritten(recordsWritten) ret } catch { @@ -222,24 +217,18 @@ object SparkHadoopWriterUtils { // TODO: these don't seem like the right abstractions. // We should abstract the duplicate code in a less awkward way. - // return type: (output metrics, bytes written callback), defined only if the latter is defined - def initHadoopOutputMetrics( - context: TaskContext): Option[(OutputMetrics, () => Long)] = { + def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, () => Long) = { val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback() - bytesWrittenCallback.map { b => - (context.taskMetrics().outputMetrics, b) - } + (context.taskMetrics().outputMetrics, bytesWrittenCallback) } def maybeUpdateOutputMetrics( - outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)], + outputMetrics: OutputMetrics, + callback: () => Long, recordsWritten: Long): Unit = { if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { - outputMetricsAndBytesWrittenCallback.foreach { - case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) - } + outputMetrics.setBytesWritten(callback()) + outputMetrics.setRecordsWritten(recordsWritten) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index a83e139c13..5fa6a7ed31 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -25,15 +25,7 @@ import scala.collection.immutable.Map import scala.reflect.ClassTag import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.mapred.FileSplit -import org.apache.hadoop.mapred.InputFormat -import org.apache.hadoop.mapred.InputSplit -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.JobID -import org.apache.hadoop.mapred.RecordReader -import org.apache.hadoop.mapred.Reporter -import org.apache.hadoop.mapred.TaskAttemptID -import org.apache.hadoop.mapred.TaskID +import org.apache.hadoop.mapred._ import org.apache.hadoop.mapred.lib.CombineFileSplit import org.apache.hadoop.mapreduce.TaskType import org.apache.hadoop.util.ReflectionUtils @@ -47,7 +39,7 @@ import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation} import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, Utils} +import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager} /** * A Spark split class that wraps around a Hadoop InputSplit. @@ -229,11 +221,11 @@ class HadoopRDD[K, V]( // creating RecordReader, because RecordReader's constructor might read some bytes private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match { case _: FileSplit | _: CombineFileSplit => - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()) case _ => None } - // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // We get our input bytes from thread-local Hadoop FileSystem statistics. // If we do a coalesce, however, we are likely to compute multiple partitions in the same // task and in the same thread, in which case we need to avoid override values written by // previous partitions (SPARK-13071). @@ -280,13 +272,9 @@ class HadoopRDD[K, V]( (key, value) } - override def close() { + override def close(): Unit = { if (reader != null) { InputFileBlockHolder.unset() - // Close the reader and release it. Note: it's very important that we don't close the - // reader more than once, since that exposes us to MAPREDUCE-5918 when running against - // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic - // corruption issues when reading compressed input. try { reader.close() } catch { @@ -326,18 +314,10 @@ class HadoopRDD[K, V]( override def getPreferredLocations(split: Partition): Seq[String] = { val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value - val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match { - case Some(c) => - try { - val lsplit = c.inputSplitWithLocationInfo.cast(hsplit) - val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]] - HadoopRDD.convertSplitLocationInfo(infos) - } catch { - case e: Exception => - logDebug("Failed to use InputSplitWithLocations.", e) - None - } - case None => None + val locs = hsplit match { + case lsplit: InputSplitWithLocationInfo => + HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo) + case _ => None } locs.getOrElse(hsplit.getLocations.filter(_ != "localhost")) } @@ -413,32 +393,12 @@ private[spark] object HadoopRDD extends Logging { } } - private[spark] class SplitInfoReflections { - val inputSplitWithLocationInfo = - Utils.classForName("org.apache.hadoop.mapred.InputSplitWithLocationInfo") - val getLocationInfo = inputSplitWithLocationInfo.getMethod("getLocationInfo") - val newInputSplit = Utils.classForName("org.apache.hadoop.mapreduce.InputSplit") - val newGetLocationInfo = newInputSplit.getMethod("getLocationInfo") - val splitLocationInfo = Utils.classForName("org.apache.hadoop.mapred.SplitLocationInfo") - val isInMemory = splitLocationInfo.getMethod("isInMemory") - val getLocation = splitLocationInfo.getMethod("getLocation") - } - - private[spark] val SPLIT_INFO_REFLECTIONS: Option[SplitInfoReflections] = try { - Some(new SplitInfoReflections) - } catch { - case e: Exception => - logDebug("SplitLocationInfo and other new Hadoop classes are " + - "unavailable. Using the older Hadoop location info code.", e) - None - } - - private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = { + private[spark] def convertSplitLocationInfo( + infos: Array[SplitLocationInfo]): Option[Seq[String]] = { Option(infos).map(_.flatMap { loc => - val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get - val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String] + val locationStr = loc.getLocation if (locationStr != "localhost") { - if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) { + if (loc.isInMemory) { logDebug(s"Partition $locationStr is cached by Hadoop.") Some(HDFSCacheTaskLocation(locationStr).toString) } else { diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 733e85f305..ce3a9a2a1e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -152,11 +152,11 @@ class NewHadoopRDD[K, V]( private val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match { case _: FileSplit | _: CombineFileSplit => - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()) case _ => None } - // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // We get our input bytes from thread-local Hadoop FileSystem statistics. // If we do a coalesce, however, we are likely to compute multiple partitions in the same // task and in the same thread, in which case we need to avoid override values written by // previous partitions (SPARK-13071). @@ -231,13 +231,9 @@ class NewHadoopRDD[K, V]( (reader.getCurrentKey, reader.getCurrentValue) } - private def close() { + private def close(): Unit = { if (reader != null) { InputFileBlockHolder.unset() - // Close the reader and release it. Note: it's very important that we don't close the - // reader more than once, since that exposes us to MAPREDUCE-5918 when running against - // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic - // corruption issues when reading compressed input. try { reader.close() } catch { @@ -277,18 +273,7 @@ class NewHadoopRDD[K, V]( override def getPreferredLocations(hsplit: Partition): Seq[String] = { val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value - val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match { - case Some(c) => - try { - val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]] - HadoopRDD.convertSplitLocationInfo(infos) - } catch { - case e : Exception => - logDebug("Failed to use InputSplit#getLocationInfo.", e) - None - } - case None => None - } + val locs = HadoopRDD.convertSplitLocationInfo(split.getLocationInfo) locs.getOrElse(split.getLocations.filter(_ != "localhost")) } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 41093bdb85..567a3183e2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -37,8 +37,7 @@ import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.annotation.Experimental import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.executor.OutputMetrics -import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkHadoopMapReduceWriter, SparkHadoopWriterUtils} +import org.apache.spark.internal.io.{SparkHadoopMapReduceWriter, SparkHadoopWriterUtils} import org.apache.spark.internal.Logging import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer @@ -1126,8 +1125,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) // around by taking a mod. We expect that no task will be attempted 2 billion times. val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt - val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = - SparkHadoopWriterUtils.initHadoopOutputMetrics(context) + val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context) writer.setup(context.stageId, context.partitionId, taskAttemptId) writer.open() @@ -1139,16 +1137,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) // Update bytes written metric every few records - SparkHadoopWriterUtils.maybeUpdateOutputMetrics( - outputMetricsAndBytesWrittenCallback, recordsWritten) + SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten) recordsWritten += 1 } }(finallyBlock = writer.close()) writer.commit() - outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) - } + outputMetrics.setBytesWritten(callback()) + outputMetrics.setRecordsWritten(recordsWritten) } self.context.runJob(self, writeToFile) |