aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorKostas Sakellis <kostas@cloudera.com>2015-01-15 18:48:39 -0800
committerPatrick Wendell <pwendell@gmail.com>2015-01-15 18:48:39 -0800
commita79a9f923c47f2ce7da93cf0ecfe2b66fcb9fdd4 (patch)
treeed37829d87ea0568cd8978b785e2f44f43fd766a /core/src/main/scala
parent96c2c714f4f9abe20d4c42d99ffaafcb269714a1 (diff)
downloadspark-a79a9f923c47f2ce7da93cf0ecfe2b66fcb9fdd4.tar.gz
spark-a79a9f923c47f2ce7da93cf0ecfe2b66fcb9fdd4.tar.bz2
spark-a79a9f923c47f2ce7da93cf0ecfe2b66fcb9fdd4.zip
[SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds
When calculating the input metrics there was an assumption that one task only reads from one block - this is not true for some operations including coalesce. This patch simply increments the task's input metrics if previous ones existed of the same read method. A limitation to this patch is that if a task reads from two different blocks of different read methods, one will override the other. Author: Kostas Sakellis <kostas@cloudera.com> Closes #3120 from ksakellis/kostas-spark-4092 and squashes the following commits: 54e6658 [Kostas Sakellis] Drops metrics if conflicting read methods exist f0e0cc5 [Kostas Sakellis] Add bytesReadCallback to InputMetrics a2a36d4 [Kostas Sakellis] CR feedback 5a0c770 [Kostas Sakellis] [SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala75
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
7 files changed, 109 insertions, 60 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 80da62c44e..a0c0372b7f 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -44,7 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
- context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
+ val inputMetrics = blockResult.inputMetrics
+ val existingMetrics = context.taskMetrics
+ .getInputMetricsForReadMethod(inputMetrics.readMethod)
+ existingMetrics.addBytesRead(inputMetrics.bytesRead)
+
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
case None =>
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index b75c77b5b4..6660b98eb8 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -379,6 +379,7 @@ private[spark] class Executor(
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
+ metrics.updateInputMetrics()
metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
if (isLocal) {
// JobProgressListener will hold an reference of it during
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 51b5328cb4..7eb10f95e0 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,6 +17,11 @@
package org.apache.spark.executor
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.executor.DataReadMethod
+import org.apache.spark.executor.DataReadMethod.DataReadMethod
+
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.annotation.DeveloperApi
@@ -80,7 +85,17 @@ class TaskMetrics extends Serializable {
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
* are stored here.
*/
- var inputMetrics: Option[InputMetrics] = None
+ private var _inputMetrics: Option[InputMetrics] = None
+
+ def inputMetrics = _inputMetrics
+
+ /**
+ * This should only be used when recreating TaskMetrics, not when updating input metrics in
+ * executors
+ */
+ private[spark] def setInputMetrics(inputMetrics: Option[InputMetrics]) {
+ _inputMetrics = inputMetrics
+ }
/**
* If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
@@ -134,6 +149,30 @@ class TaskMetrics extends Serializable {
}
/**
+ * Returns the input metrics object that the task should use. Currently, if
+ * there exists an input metric with the same readMethod, we return that one
+ * so the caller can accumulate bytes read. If the readMethod is different
+ * than previously seen by this task, we return a new InputMetric but don't
+ * record it.
+ *
+ * Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
+ * we can store all the different inputMetrics (one per readMethod).
+ */
+ private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod):
+ InputMetrics =synchronized {
+ _inputMetrics match {
+ case None =>
+ val metrics = new InputMetrics(readMethod)
+ _inputMetrics = Some(metrics)
+ metrics
+ case Some(metrics @ InputMetrics(method)) if method == readMethod =>
+ metrics
+ case Some(InputMetrics(method)) =>
+ new InputMetrics(readMethod)
+ }
+ }
+
+ /**
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
private[spark] def updateShuffleReadMetrics() = synchronized {
@@ -146,6 +185,10 @@ class TaskMetrics extends Serializable {
}
_shuffleReadMetrics = Some(merged)
}
+
+ private[spark] def updateInputMetrics() = synchronized {
+ inputMetrics.foreach(_.updateBytesRead())
+ }
}
private[spark] object TaskMetrics {
@@ -179,10 +222,38 @@ object DataWriteMethod extends Enumeration with Serializable {
*/
@DeveloperApi
case class InputMetrics(readMethod: DataReadMethod.Value) {
+
+ private val _bytesRead: AtomicLong = new AtomicLong()
+
/**
* Total bytes read.
*/
- var bytesRead: Long = 0L
+ def bytesRead: Long = _bytesRead.get()
+ @volatile @transient var bytesReadCallback: Option[() => Long] = None
+
+ /**
+ * Adds additional bytes read for this read method.
+ */
+ def addBytesRead(bytes: Long) = {
+ _bytesRead.addAndGet(bytes)
+ }
+
+ /**
+ * Invoke the bytesReadCallback and mutate bytesRead.
+ */
+ def updateBytesRead() {
+ bytesReadCallback.foreach { c =>
+ _bytesRead.set(c())
+ }
+ }
+
+ /**
+ * Register a function that can be called to get up-to-date information on how many bytes the task
+ * has read from an input source.
+ */
+ def setBytesReadCallback(f: Option[() => Long]) {
+ bytesReadCallback = f
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 37e0c13029..3b99d3a6ca 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -213,18 +213,19 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
- val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ val inputMetrics = context.taskMetrics
+ .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
- split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
- } else {
- None
- }
- if (bytesReadCallback.isDefined) {
- context.taskMetrics.inputMetrics = Some(inputMetrics)
- }
+ val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
+ split.inputSplit.value match {
+ case split: FileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
+ case _ => None
+ }
+ )
+ inputMetrics.setBytesReadCallback(bytesReadCallback)
var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
@@ -237,8 +238,6 @@ class HadoopRDD[K, V](
val key: K = reader.createKey()
val value: V = reader.createValue()
- var recordsSinceMetricsUpdate = 0
-
override def getNext() = {
try {
finished = !reader.next(key, value)
@@ -246,16 +245,6 @@ class HadoopRDD[K, V](
case eof: EOFException =>
finished = true
}
-
- // Update bytes read metric every few records
- if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
- && bytesReadCallback.isDefined) {
- recordsSinceMetricsUpdate = 0
- val bytesReadFn = bytesReadCallback.get
- inputMetrics.bytesRead = bytesReadFn()
- } else {
- recordsSinceMetricsUpdate += 1
- }
(key, value)
}
@@ -263,14 +252,12 @@ class HadoopRDD[K, V](
try {
reader.close()
if (bytesReadCallback.isDefined) {
- val bytesReadFn = bytesReadCallback.get
- inputMetrics.bytesRead = bytesReadFn()
+ inputMetrics.updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.bytesRead = split.inputSplit.value.getLength
- context.taskMetrics.inputMetrics = Some(inputMetrics)
+ inputMetrics.addBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index e55d03d391..890ec677c2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -109,18 +109,19 @@ class NewHadoopRDD[K, V](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
- val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ val inputMetrics = context.taskMetrics
+ .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
- split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
- } else {
- None
- }
- if (bytesReadCallback.isDefined) {
- context.taskMetrics.inputMetrics = Some(inputMetrics)
- }
+ val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
+ split.serializableHadoopSplit.value match {
+ case split: FileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf)
+ case _ => None
+ }
+ )
+ inputMetrics.setBytesReadCallback(bytesReadCallback)
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
@@ -153,34 +154,19 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
-
- // Update bytes read metric every few records
- if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
- && bytesReadCallback.isDefined) {
- recordsSinceMetricsUpdate = 0
- val bytesReadFn = bytesReadCallback.get
- inputMetrics.bytesRead = bytesReadFn()
- } else {
- recordsSinceMetricsUpdate += 1
- }
-
(reader.getCurrentKey, reader.getCurrentValue)
}
private def close() {
try {
reader.close()
-
- // Update metrics with final amount
if (bytesReadCallback.isDefined) {
- val bytesReadFn = bytesReadCallback.get
- inputMetrics.bytesRead = bytesReadFn()
+ inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength
- context.taskMetrics.inputMetrics = Some(inputMetrics)
+ inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1427305d91..8bc5a1cd18 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -53,7 +53,7 @@ private[spark] class BlockResult(
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
- inputMetrics.bytesRead = bytes
+ inputMetrics.addBytesRead(bytes)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a025011006..ee3756c226 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -637,8 +637,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson))
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
- metrics.inputMetrics =
- Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
+ metrics.setInputMetrics(
+ Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson))
metrics.outputMetrics =
Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson)
metrics.updatedBlocks =
@@ -671,7 +671,7 @@ private[spark] object JsonProtocol {
def inputMetricsFromJson(json: JValue): InputMetrics = {
val metrics = new InputMetrics(
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
- metrics.bytesRead = (json \ "Bytes Read").extract[Long]
+ metrics.addBytesRead((json \ "Bytes Read").extract[Long])
metrics
}