aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-10-27 10:04:24 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-10-27 10:04:24 -0700
commitdea302ddbd26b1f20fb8a3979bd1d8e1717479f8 (patch)
treedc144e0947d86c1547e86a2fb117fd65af5a2983
parentc9e05ca27c9c702b510d424e3befc87213f24e0f (diff)
downloadspark-dea302ddbd26b1f20fb8a3979bd1d8e1717479f8.tar.gz
spark-dea302ddbd26b1f20fb8a3979bd1d8e1717479f8.tar.bz2
spark-dea302ddbd26b1f20fb8a3979bd1d8e1717479f8.zip
SPARK-2621. Update task InputMetrics incrementally
The patch takes advantage an API provided in Hadoop 2.5 that allows getting accurate data on Hadoop FileSystem bytes read. It eliminates the old method, which naively accepts the split size as the input bytes. An impact of this change will be that input metrics go away when using against Hadoop versions earlier thatn 2.5. I can add this back in, but my opinion is that no metrics are better than inaccurate metrics. This is difficult to write a test for because we don't usually build against a version of Hadoop that contains the function we need. I've tested it manually on a pseudo-distributed cluster. Author: Sandy Ryza <sandy@cloudera.com> Closes #2087 from sryza/sandy-spark-2621 and squashes the following commits: 23010b8 [Sandy Ryza] Missing style fixes 74fc9bb [Sandy Ryza] Make getFSBytesReadOnThreadCallback private 1ab662d [Sandy Ryza] Clear things up a bit 984631f [Sandy Ryza] Switch from pull to push model and add test 7ef7b22 [Sandy Ryza] Add missing curly braces 219abc9 [Sandy Ryza] Fall back to split size 90dbc14 [Sandy Ryza] SPARK-2621. Update task InputMetrics incrementally
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala53
6 files changed, 170 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index fe0ad9ebbc..e28eaad8a5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -20,12 +20,15 @@ package org.apache.spark.deploy
import java.security.PrivilegedExceptionAction
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.Utils
import scala.collection.JavaConversions._
@@ -121,6 +124,33 @@ class SparkHadoopUtil extends Logging {
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}
+ /**
+ * Returns a function that can be called to find Hadoop FileSystem bytes read. If
+ * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
+ * return the bytes read on r since t. Reflection is required because thread-level FileSystem
+ * statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
+ * Returns None if the required method can't be found.
+ */
+ private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
+ : Option[() => Long] = {
+ val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
+ val scheme = qualifiedPath.toUri().getScheme()
+ val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
+ try {
+ val threadStats = stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
+ val statisticsDataClass =
+ Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
+ val getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead")
+ val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
+ val baselineBytesRead = f()
+ Some(() => f() - baselineBytesRead)
+ } catch {
+ case e: NoSuchMethodException => {
+ logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
+ None
+ }
+ }
+ }
}
object SparkHadoopUtil {
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 3e49b6235a..57bc2b40ce 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -169,7 +169,6 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
var bytesRead: Long = 0L
}
-
/**
* :: DeveloperApi ::
* Metrics pertaining to shuffle data read in a given task.
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 775141775e..946fb5616d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -46,7 +46,6 @@ import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.{NextIterator, Utils}
import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
-
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
@@ -224,18 +223,18 @@ class HadoopRDD[K, V](
val key: K = reader.createKey()
val value: V = reader.createValue()
- // Set the task input metrics.
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
- try {
- /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
- * always at record boundaries, so tasks may need to read into other splits to complete
- * a record. */
- inputMetrics.bytesRead = split.inputSplit.value.getLength()
- } catch {
- case e: java.io.IOException =>
- logWarning("Unable to get input size to set InputMetrics for task", e)
+ // Find a function that will return the FileSystem bytes read by this thread.
+ val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
+ split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
+ } else {
+ None
+ }
+ if (bytesReadCallback.isDefined) {
+ context.taskMetrics.inputMetrics = Some(inputMetrics)
}
- context.taskMetrics.inputMetrics = Some(inputMetrics)
+ var recordsSinceMetricsUpdate = 0
override def getNext() = {
try {
@@ -244,12 +243,36 @@ class HadoopRDD[K, V](
case eof: EOFException =>
finished = true
}
+
+ // Update bytes read metric every few records
+ if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
+ && bytesReadCallback.isDefined) {
+ recordsSinceMetricsUpdate = 0
+ val bytesReadFn = bytesReadCallback.get
+ inputMetrics.bytesRead = bytesReadFn()
+ } else {
+ recordsSinceMetricsUpdate += 1
+ }
(key, value)
}
override def close() {
try {
reader.close()
+ if (bytesReadCallback.isDefined) {
+ val bytesReadFn = bytesReadCallback.get
+ inputMetrics.bytesRead = bytesReadFn()
+ } else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
+ // If we can't get the bytes read from the FS stats, fall back to the split size,
+ // which may be inaccurate.
+ try {
+ inputMetrics.bytesRead = split.inputSplit.value.getLength
+ context.taskMetrics.inputMetrics = Some(inputMetrics)
+ } catch {
+ case e: java.io.IOException =>
+ logWarning("Unable to get input size to set InputMetrics for task", e)
+ }
+ }
} catch {
case e: Exception => {
if (!Utils.inShutdown()) {
@@ -302,6 +325,9 @@ private[spark] object HadoopRDD extends Logging {
*/
val CONFIGURATION_INSTANTIATION_LOCK = new Object()
+ /** Update the input bytes read metric each time this number of records has been read */
+ val RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES = 256
+
/**
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of
* the local process.
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 0cccdefc5e..3245632487 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -25,6 +25,7 @@ import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.input.WholeTextFileInputFormat
@@ -36,6 +37,7 @@ import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.Utils
+import org.apache.spark.deploy.SparkHadoopUtil
private[spark] class NewHadoopPartition(
rddId: Int,
@@ -118,21 +120,22 @@ class NewHadoopRDD[K, V](
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
- try {
- /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
- * always at record boundaries, so tasks may need to read into other splits to complete
- * a record. */
- inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength()
- } catch {
- case e: Exception =>
- logWarning("Unable to get input split size in order to set task input bytes", e)
+ // Find a function that will return the FileSystem bytes read by this thread.
+ val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
+ split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
+ } else {
+ None
+ }
+ if (bytesReadCallback.isDefined) {
+ context.taskMetrics.inputMetrics = Some(inputMetrics)
}
- context.taskMetrics.inputMetrics = Some(inputMetrics)
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
var havePair = false
var finished = false
+ var recordsSinceMetricsUpdate = 0
override def hasNext: Boolean = {
if (!finished && !havePair) {
@@ -147,12 +150,39 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
+
+ // Update bytes read metric every few records
+ if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
+ && bytesReadCallback.isDefined) {
+ recordsSinceMetricsUpdate = 0
+ val bytesReadFn = bytesReadCallback.get
+ inputMetrics.bytesRead = bytesReadFn()
+ } else {
+ recordsSinceMetricsUpdate += 1
+ }
+
(reader.getCurrentKey, reader.getCurrentValue)
}
private def close() {
try {
reader.close()
+
+ // Update metrics with final amount
+ if (bytesReadCallback.isDefined) {
+ val bytesReadFn = bytesReadCallback.get
+ inputMetrics.bytesRead = bytesReadFn()
+ } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
+ // If we can't get the bytes read from the FS stats, fall back to the split size,
+ // which may be inaccurate.
+ try {
+ inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength
+ context.taskMetrics.inputMetrics = Some(inputMetrics)
+ } catch {
+ case e: java.io.IOException =>
+ logWarning("Unable to get input size to set InputMetrics for task", e)
+ }
+ }
} catch {
case e: Exception => {
if (!Utils.inShutdown()) {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 84ed5db8f0..93ac9f1c33 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1673,6 +1673,17 @@ private[spark] object Utils extends Logging {
PropertyConfigurator.configure(pro)
}
+ def invoke(
+ clazz: Class[_],
+ obj: AnyRef,
+ methodName: String,
+ args: (Class[_], AnyRef)*): AnyRef = {
+ val (types, values) = args.unzip
+ val method = clazz.getDeclaredMethod(methodName, types: _*)
+ method.setAccessible(true)
+ method.invoke(obj, values.toSeq: _*)
+ }
+
}
/**
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
new file mode 100644
index 0000000000..33bd1afea2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SharedSparkContext
+import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}
+
+import scala.collection.mutable.ArrayBuffer
+
+import java.io.{FileWriter, PrintWriter, File}
+
+class InputMetricsSuite extends FunSuite with SharedSparkContext {
+ test("input metrics when reading text file") {
+ val file = new File(getClass.getSimpleName + ".txt")
+ val pw = new PrintWriter(new FileWriter(file))
+ pw.println("some stuff")
+ pw.println("some other stuff")
+ pw.println("yet more stuff")
+ pw.println("too much stuff")
+ pw.close()
+ file.deleteOnExit()
+
+ val taskBytesRead = new ArrayBuffer[Long]()
+ sc.addSparkListener(new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
+ }
+ })
+ sc.textFile("file://" + file.getAbsolutePath, 2).count()
+
+ // Wait for task end events to come in
+ sc.listenerBus.waitUntilEmpty(500)
+ assert(taskBytesRead.length == 2)
+ assert(taskBytesRead.sum == file.length())
+ }
+}