aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKostas Sakellis <kostas@cloudera.com>2015-01-15 18:48:39 -0800
committerPatrick Wendell <pwendell@gmail.com>2015-01-15 18:48:39 -0800
commita79a9f923c47f2ce7da93cf0ecfe2b66fcb9fdd4 (patch)
treeed37829d87ea0568cd8978b785e2f44f43fd766a /core
parent96c2c714f4f9abe20d4c42d99ffaafcb269714a1 (diff)
downloadspark-a79a9f923c47f2ce7da93cf0ecfe2b66fcb9fdd4.tar.gz
spark-a79a9f923c47f2ce7da93cf0ecfe2b66fcb9fdd4.tar.bz2
spark-a79a9f923c47f2ce7da93cf0ecfe2b66fcb9fdd4.zip
[SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds
When calculating the input metrics there was an assumption that one task only reads from one block - this is not true for some operations including coalesce. This patch simply increments the task's input metrics if previous ones existed of the same read method. A limitation to this patch is that if a task reads from two different blocks of different read methods, one will override the other. Author: Kostas Sakellis <kostas@cloudera.com> Closes #3120 from ksakellis/kostas-spark-4092 and squashes the following commits: 54e6658 [Kostas Sakellis] Drops metrics if conflicting read methods exist f0e0cc5 [Kostas Sakellis] Add bytesReadCallback to InputMetrics a2a36d4 [Kostas Sakellis] CR feedback 5a0c770 [Kostas Sakellis] [SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala75
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala195
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala4
10 files changed, 270 insertions, 102 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 80da62c44e..a0c0372b7f 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -44,7 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
- context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
+ val inputMetrics = blockResult.inputMetrics
+ val existingMetrics = context.taskMetrics
+ .getInputMetricsForReadMethod(inputMetrics.readMethod)
+ existingMetrics.addBytesRead(inputMetrics.bytesRead)
+
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
case None =>
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index b75c77b5b4..6660b98eb8 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -379,6 +379,7 @@ private[spark] class Executor(
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
+ metrics.updateInputMetrics()
metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
if (isLocal) {
// JobProgressListener will hold an reference of it during
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 51b5328cb4..7eb10f95e0 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,6 +17,11 @@
package org.apache.spark.executor
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.executor.DataReadMethod
+import org.apache.spark.executor.DataReadMethod.DataReadMethod
+
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.annotation.DeveloperApi
@@ -80,7 +85,17 @@ class TaskMetrics extends Serializable {
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
* are stored here.
*/
- var inputMetrics: Option[InputMetrics] = None
+ private var _inputMetrics: Option[InputMetrics] = None
+
+ def inputMetrics = _inputMetrics
+
+ /**
+ * This should only be used when recreating TaskMetrics, not when updating input metrics in
+ * executors
+ */
+ private[spark] def setInputMetrics(inputMetrics: Option[InputMetrics]) {
+ _inputMetrics = inputMetrics
+ }
/**
* If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
@@ -134,6 +149,30 @@ class TaskMetrics extends Serializable {
}
/**
+ * Returns the input metrics object that the task should use. Currently, if
+ * there exists an input metric with the same readMethod, we return that one
+ * so the caller can accumulate bytes read. If the readMethod is different
+ * than previously seen by this task, we return a new InputMetric but don't
+ * record it.
+ *
+ * Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
+ * we can store all the different inputMetrics (one per readMethod).
+ */
+ private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod):
+ InputMetrics =synchronized {
+ _inputMetrics match {
+ case None =>
+ val metrics = new InputMetrics(readMethod)
+ _inputMetrics = Some(metrics)
+ metrics
+ case Some(metrics @ InputMetrics(method)) if method == readMethod =>
+ metrics
+ case Some(InputMetrics(method)) =>
+ new InputMetrics(readMethod)
+ }
+ }
+
+ /**
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
private[spark] def updateShuffleReadMetrics() = synchronized {
@@ -146,6 +185,10 @@ class TaskMetrics extends Serializable {
}
_shuffleReadMetrics = Some(merged)
}
+
+ private[spark] def updateInputMetrics() = synchronized {
+ inputMetrics.foreach(_.updateBytesRead())
+ }
}
private[spark] object TaskMetrics {
@@ -179,10 +222,38 @@ object DataWriteMethod extends Enumeration with Serializable {
*/
@DeveloperApi
case class InputMetrics(readMethod: DataReadMethod.Value) {
+
+ private val _bytesRead: AtomicLong = new AtomicLong()
+
/**
* Total bytes read.
*/
- var bytesRead: Long = 0L
+ def bytesRead: Long = _bytesRead.get()
+ @volatile @transient var bytesReadCallback: Option[() => Long] = None
+
+ /**
+ * Adds additional bytes read for this read method.
+ */
+ def addBytesRead(bytes: Long) = {
+ _bytesRead.addAndGet(bytes)
+ }
+
+ /**
+ * Invoke the bytesReadCallback and mutate bytesRead.
+ */
+ def updateBytesRead() {
+ bytesReadCallback.foreach { c =>
+ _bytesRead.set(c())
+ }
+ }
+
+ /**
+ * Register a function that can be called to get up-to-date information on how many bytes the task
+ * has read from an input source.
+ */
+ def setBytesReadCallback(f: Option[() => Long]) {
+ bytesReadCallback = f
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 37e0c13029..3b99d3a6ca 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -213,18 +213,19 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
- val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ val inputMetrics = context.taskMetrics
+ .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
- split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
- } else {
- None
- }
- if (bytesReadCallback.isDefined) {
- context.taskMetrics.inputMetrics = Some(inputMetrics)
- }
+ val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
+ split.inputSplit.value match {
+ case split: FileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
+ case _ => None
+ }
+ )
+ inputMetrics.setBytesReadCallback(bytesReadCallback)
var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
@@ -237,8 +238,6 @@ class HadoopRDD[K, V](
val key: K = reader.createKey()
val value: V = reader.createValue()
- var recordsSinceMetricsUpdate = 0
-
override def getNext() = {
try {
finished = !reader.next(key, value)
@@ -246,16 +245,6 @@ class HadoopRDD[K, V](
case eof: EOFException =>
finished = true
}
-
- // Update bytes read metric every few records
- if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
- && bytesReadCallback.isDefined) {
- recordsSinceMetricsUpdate = 0
- val bytesReadFn = bytesReadCallback.get
- inputMetrics.bytesRead = bytesReadFn()
- } else {
- recordsSinceMetricsUpdate += 1
- }
(key, value)
}
@@ -263,14 +252,12 @@ class HadoopRDD[K, V](
try {
reader.close()
if (bytesReadCallback.isDefined) {
- val bytesReadFn = bytesReadCallback.get
- inputMetrics.bytesRead = bytesReadFn()
+ inputMetrics.updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.bytesRead = split.inputSplit.value.getLength
- context.taskMetrics.inputMetrics = Some(inputMetrics)
+ inputMetrics.addBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index e55d03d391..890ec677c2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -109,18 +109,19 @@ class NewHadoopRDD[K, V](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
- val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ val inputMetrics = context.taskMetrics
+ .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
- split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
- } else {
- None
- }
- if (bytesReadCallback.isDefined) {
- context.taskMetrics.inputMetrics = Some(inputMetrics)
- }
+ val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
+ split.serializableHadoopSplit.value match {
+ case split: FileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf)
+ case _ => None
+ }
+ )
+ inputMetrics.setBytesReadCallback(bytesReadCallback)
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
@@ -153,34 +154,19 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
-
- // Update bytes read metric every few records
- if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
- && bytesReadCallback.isDefined) {
- recordsSinceMetricsUpdate = 0
- val bytesReadFn = bytesReadCallback.get
- inputMetrics.bytesRead = bytesReadFn()
- } else {
- recordsSinceMetricsUpdate += 1
- }
-
(reader.getCurrentKey, reader.getCurrentValue)
}
private def close() {
try {
reader.close()
-
- // Update metrics with final amount
if (bytesReadCallback.isDefined) {
- val bytesReadFn = bytesReadCallback.get
- inputMetrics.bytesRead = bytesReadFn()
+ inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength
- context.taskMetrics.inputMetrics = Some(inputMetrics)
+ inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1427305d91..8bc5a1cd18 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -53,7 +53,7 @@ private[spark] class BlockResult(
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
- inputMetrics.bytesRead = bytes
+ inputMetrics.addBytesRead(bytes)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a025011006..ee3756c226 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -637,8 +637,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson))
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
- metrics.inputMetrics =
- Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
+ metrics.setInputMetrics(
+ Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson))
metrics.outputMetrics =
Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson)
metrics.updatedBlocks =
@@ -671,7 +671,7 @@ private[spark] object JsonProtocol {
def inputMetricsFromJson(json: JValue): InputMetrics = {
val metrics = new InputMetrics(
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
- metrics.bytesRead = (json \ "Bytes Read").extract[Long]
+ metrics.addBytesRead((json \ "Bytes Read").extract[Long])
metrics
}
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index f8bcde12a3..10a39990f8 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -17,66 +17,185 @@
package org.apache.spark.metrics
-import java.io.{FileWriter, PrintWriter, File}
+import java.io.{File, FileWriter, PrintWriter}
-import org.apache.spark.SharedSparkContext
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}
+import scala.collection.mutable.ArrayBuffer
import org.scalatest.FunSuite
-import org.scalatest.Matchers
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
-import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.SharedSparkContext
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
+import org.apache.spark.util.Utils
+
+class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
-class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Matchers {
- test("input metrics when reading text file with single split") {
- val file = new File(getClass.getSimpleName + ".txt")
- val pw = new PrintWriter(new FileWriter(file))
- pw.println("some stuff")
- pw.println("some other stuff")
- pw.println("yet more stuff")
- pw.println("too much stuff")
+ @transient var tmpDir: File = _
+ @transient var tmpFile: File = _
+ @transient var tmpFilePath: String = _
+
+ override def beforeAll() {
+ super.beforeAll()
+
+ tmpDir = Utils.createTempDir()
+ val testTempDir = new File(tmpDir, "test")
+ testTempDir.mkdir()
+
+ tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt")
+ val pw = new PrintWriter(new FileWriter(tmpFile))
+ for (x <- 1 to 1000000) {
+ pw.println("s")
+ }
pw.close()
- file.deleteOnExit()
- val taskBytesRead = new ArrayBuffer[Long]()
- sc.addSparkListener(new SparkListener() {
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
- }
- })
- sc.textFile("file://" + file.getAbsolutePath, 2).count()
+ // Path to tmpFile
+ tmpFilePath = "file://" + tmpFile.getAbsolutePath
+ }
- // Wait for task end events to come in
- sc.listenerBus.waitUntilEmpty(500)
- assert(taskBytesRead.length == 2)
- assert(taskBytesRead.sum >= file.length())
+ override def afterAll() {
+ super.afterAll()
+ Utils.deleteRecursively(tmpDir)
}
- test("input metrics when reading text file with multiple splits") {
- val file = new File(getClass.getSimpleName + ".txt")
- val pw = new PrintWriter(new FileWriter(file))
- for (i <- 0 until 10000) {
- pw.println("some stuff")
+ test("input metrics for old hadoop with coalesce") {
+ val bytesRead = runAndReturnBytesRead {
+ sc.textFile(tmpFilePath, 4).count()
+ }
+ val bytesRead2 = runAndReturnBytesRead {
+ sc.textFile(tmpFilePath, 4).coalesce(2).count()
+ }
+ assert(bytesRead != 0)
+ assert(bytesRead == bytesRead2)
+ assert(bytesRead2 >= tmpFile.length())
+ }
+
+ test("input metrics with cache and coalesce") {
+ // prime the cache manager
+ val rdd = sc.textFile(tmpFilePath, 4).cache()
+ rdd.collect()
+
+ val bytesRead = runAndReturnBytesRead {
+ rdd.count()
+ }
+ val bytesRead2 = runAndReturnBytesRead {
+ rdd.coalesce(4).count()
}
- pw.close()
- file.deleteOnExit()
+ // for count and coelesce, the same bytes should be read.
+ assert(bytesRead != 0)
+ assert(bytesRead2 == bytesRead)
+ }
+
+ /**
+ * This checks the situation where we have interleaved reads from
+ * different sources. Currently, we only accumulate fron the first
+ * read method we find in the task. This test uses cartesian to create
+ * the interleaved reads.
+ *
+ * Once https://issues.apache.org/jira/browse/SPARK-5225 is fixed
+ * this test should break.
+ */
+ test("input metrics with mixed read method") {
+ // prime the cache manager
+ val numPartitions = 2
+ val rdd = sc.parallelize(1 to 100, numPartitions).cache()
+ rdd.collect()
+
+ val rdd2 = sc.textFile(tmpFilePath, numPartitions)
+
+ val bytesRead = runAndReturnBytesRead {
+ rdd.count()
+ }
+ val bytesRead2 = runAndReturnBytesRead {
+ rdd2.count()
+ }
+
+ val cartRead = runAndReturnBytesRead {
+ rdd.cartesian(rdd2).count()
+ }
+
+ assert(cartRead != 0)
+ assert(bytesRead != 0)
+ // We read from the first rdd of the cartesian once per partition.
+ assert(cartRead == bytesRead * numPartitions)
+ }
+
+ test("input metrics for new Hadoop API with coalesce") {
+ val bytesRead = runAndReturnBytesRead {
+ sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
+ classOf[Text]).count()
+ }
+ val bytesRead2 = runAndReturnBytesRead {
+ sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
+ classOf[Text]).coalesce(5).count()
+ }
+ assert(bytesRead != 0)
+ assert(bytesRead2 == bytesRead)
+ assert(bytesRead >= tmpFile.length())
+ }
+
+ test("input metrics when reading text file") {
+ val bytesRead = runAndReturnBytesRead {
+ sc.textFile(tmpFilePath, 2).count()
+ }
+ assert(bytesRead >= tmpFile.length())
+ }
+
+ test("input metrics with interleaved reads") {
+ val numPartitions = 2
+ val cartVector = 0 to 9
+ val cartFile = new File(tmpDir, getClass.getSimpleName + "_cart.txt")
+ val cartFilePath = "file://" + cartFile.getAbsolutePath
+
+ // write files to disk so we can read them later.
+ sc.parallelize(cartVector).saveAsTextFile(cartFilePath)
+ val aRdd = sc.textFile(cartFilePath, numPartitions)
+
+ val tmpRdd = sc.textFile(tmpFilePath, numPartitions)
+
+ val firstSize= runAndReturnBytesRead {
+ aRdd.count()
+ }
+ val secondSize = runAndReturnBytesRead {
+ tmpRdd.count()
+ }
+
+ val cartesianBytes = runAndReturnBytesRead {
+ aRdd.cartesian(tmpRdd).count()
+ }
+
+ // Computing the amount of bytes read for a cartesian operation is a little involved.
+ // Cartesian interleaves reads between two partitions eg. p1 and p2.
+ // Here are the steps:
+ // 1) First it creates an iterator for p1
+ // 2) Creates an iterator for p2
+ // 3) Reads the first element of p1 and then all the elements of p2
+ // 4) proceeds to the next element of p1
+ // 5) Creates a new iterator for p2
+ // 6) rinse and repeat.
+ // As a result we read from the second partition n times where n is the number of keys in
+ // p1. Thus the math below for the test.
+ assert(cartesianBytes != 0)
+ assert(cartesianBytes == firstSize * numPartitions + (cartVector.length * secondSize))
+ }
+
+ private def runAndReturnBytesRead(job : => Unit): Long = {
val taskBytesRead = new ArrayBuffer[Long]()
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
}
})
- sc.textFile("file://" + file.getAbsolutePath, 2).count()
- // Wait for task end events to come in
+ job
+
sc.listenerBus.waitUntilEmpty(500)
- assert(taskBytesRead.length == 2)
- assert(taskBytesRead.sum >= file.length())
+ taskBytesRead.sum
}
test("output metrics when writing text file") {
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 12af60caf7..f865d8ca04 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -231,8 +231,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskMetrics.diskBytesSpilled = base + 5
taskMetrics.memoryBytesSpilled = base + 6
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
- taskMetrics.inputMetrics = Some(inputMetrics)
- inputMetrics.bytesRead = base + 7
+ taskMetrics.setInputMetrics(Some(inputMetrics))
+ inputMetrics.addBytesRead(base + 7)
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
taskMetrics.outputMetrics = Some(outputMetrics)
outputMetrics.bytesWritten = base + 8
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 5ba94ff67d..71dfed1289 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -630,8 +630,8 @@ class JsonProtocolSuite extends FunSuite {
if (hasHadoopInput) {
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
- inputMetrics.bytesRead = d + e + f
- t.inputMetrics = Some(inputMetrics)
+ inputMetrics.addBytesRead(d + e + f)
+ t.setInputMetrics(Some(inputMetrics))
} else {
val sr = new ShuffleReadMetrics
sr.remoteBytesRead = b + d