aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/executor/InputMetrics.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala2
-rw-r--r--project/MimaExcludes.scala5
11 files changed, 40 insertions, 97 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 6d30d3c76a..83e11c5e23 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -81,35 +81,9 @@ class InputMetrics private (
*/
def readMethod: DataReadMethod.Value = DataReadMethod.withName(_readMethod.localValue)
- // Once incBytesRead & intRecordsRead is ready to be removed from the public API
- // we can remove the internal versions and make the previous public API private.
- // This has been done to suppress warnings when building.
- @deprecated("incrementing input metrics is for internal use only", "2.0.0")
- def incBytesRead(v: Long): Unit = _bytesRead.add(v)
- private[spark] def incBytesReadInternal(v: Long): Unit = _bytesRead.add(v)
- @deprecated("incrementing input metrics is for internal use only", "2.0.0")
- def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
- private[spark] def incRecordsReadInternal(v: Long): Unit = _recordsRead.add(v)
+ private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
+ private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
- private[spark] def setReadMethod(v: DataReadMethod.Value): Unit =
- _readMethod.setValue(v.toString)
+ private[spark] def setReadMethod(v: DataReadMethod.Value): Unit = _readMethod.setValue(v.toString)
}
-
-/**
- * Deprecated methods to preserve case class matching behavior before Spark 2.0.
- */
-object InputMetrics {
-
- @deprecated("matching on InputMetrics will not be supported in the future", "2.0.0")
- def apply(readMethod: DataReadMethod.Value): InputMetrics = {
- val im = new InputMetrics
- im.setReadMethod(readMethod)
- im
- }
-
- @deprecated("matching on InputMetrics will not be supported in the future", "2.0.0")
- def unapply(input: InputMetrics): Option[DataReadMethod.Value] = {
- Some(input.readMethod)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
index 0b37d559c7..93f953846f 100644
--- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -52,18 +52,6 @@ class OutputMetrics private (
}
/**
- * Create a new [[OutputMetrics]] that is not associated with any particular task.
- *
- * This is only used for preserving matching behavior on [[OutputMetrics]], which used to be
- * a case class before Spark 2.0. Once we remove support for matching on [[OutputMetrics]]
- * we can remove this constructor as well.
- */
- private[executor] def this() {
- this(InternalAccumulator.createOutputAccums()
- .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]])
- }
-
- /**
* Total number of bytes written.
*/
def bytesWritten: Long = _bytesWritten.localValue
@@ -84,21 +72,3 @@ class OutputMetrics private (
_writeMethod.setValue(v.toString)
}
-
-/**
- * Deprecated methods to preserve case class matching behavior before Spark 2.0.
- */
-object OutputMetrics {
-
- @deprecated("matching on OutputMetrics will not be supported in the future", "2.0.0")
- def apply(writeMethod: DataWriteMethod.Value): OutputMetrics = {
- val om = new OutputMetrics
- om.setWriteMethod(writeMethod)
- om
- }
-
- @deprecated("matching on OutputMetrics will not be supported in the future", "2.0.0")
- def unapply(output: OutputMetrics): Option[DataWriteMethod.Value] = {
- Some(output.writeMethod)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index 50bb645d97..71a24770b5 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -116,4 +116,25 @@ class ShuffleReadMetrics private (
private[spark] def setFetchWaitTime(v: Long): Unit = _fetchWaitTime.setValue(v)
private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
+ /**
+ * Resets the value of the current metrics (`this`) and and merges all the independent
+ * [[ShuffleReadMetrics]] into `this`.
+ */
+ private[spark] def setMergeValues(metrics: Seq[ShuffleReadMetrics]): Unit = {
+ _remoteBlocksFetched.setValue(_remoteBlocksFetched.zero)
+ _localBlocksFetched.setValue(_localBlocksFetched.zero)
+ _remoteBytesRead.setValue(_remoteBytesRead.zero)
+ _localBytesRead.setValue(_localBytesRead.zero)
+ _fetchWaitTime.setValue(_fetchWaitTime.zero)
+ _recordsRead.setValue(_recordsRead.zero)
+ metrics.foreach { metric =>
+ _remoteBlocksFetched.add(metric.remoteBlocksFetched)
+ _localBlocksFetched.add(metric.localBlocksFetched)
+ _remoteBytesRead.add(metric.remoteBytesRead)
+ _localBytesRead.add(metric.localBytesRead)
+ _fetchWaitTime.add(metric.fetchWaitTime)
+ _recordsRead.add(metric.recordsRead)
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 02219a84ab..bda2a91d9d 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -139,16 +139,6 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
*/
def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses.localValue
- @deprecated("use updatedBlockStatuses instead", "2.0.0")
- def updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = {
- if (updatedBlockStatuses.nonEmpty) Some(updatedBlockStatuses) else None
- }
-
- @deprecated("setting updated blocks is not allowed", "2.0.0")
- def updatedBlocks_=(blocks: Option[Seq[(BlockId, BlockStatus)]]): Unit = {
- blocks.foreach(setUpdatedBlockStatuses)
- }
-
// Setters and increment-ers
private[spark] def setExecutorDeserializeTime(v: Long): Unit =
_executorDeserializeTime.setValue(v)
@@ -225,11 +215,6 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
*/
def outputMetrics: Option[OutputMetrics] = _outputMetrics
- @deprecated("setting OutputMetrics is for internal use only", "2.0.0")
- def outputMetrics_=(om: Option[OutputMetrics]): Unit = {
- _outputMetrics = om
- }
-
/**
* Get or create a new [[OutputMetrics]] associated with this task.
*/
@@ -285,12 +270,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
private[spark] def mergeShuffleReadMetrics(): Unit = synchronized {
if (tempShuffleReadMetrics.nonEmpty) {
val metrics = new ShuffleReadMetrics(initialAccumsMap)
- metrics.setRemoteBlocksFetched(tempShuffleReadMetrics.map(_.remoteBlocksFetched).sum)
- metrics.setLocalBlocksFetched(tempShuffleReadMetrics.map(_.localBlocksFetched).sum)
- metrics.setFetchWaitTime(tempShuffleReadMetrics.map(_.fetchWaitTime).sum)
- metrics.setRemoteBytesRead(tempShuffleReadMetrics.map(_.remoteBytesRead).sum)
- metrics.setLocalBytesRead(tempShuffleReadMetrics.map(_.localBytesRead).sum)
- metrics.setRecordsRead(tempShuffleReadMetrics.map(_.recordsRead).sum)
+ metrics.setMergeValues(tempShuffleReadMetrics)
_shuffleReadMetrics = Some(metrics)
}
}
@@ -306,11 +286,6 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
*/
def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics
- @deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0")
- def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = {
- _shuffleWriteMetrics = swm
- }
-
/**
* Get or create a new [[ShuffleWriteMetrics]] associated with this task.
*/
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index f7c646c668..35d190b464 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -259,7 +259,7 @@ class HadoopRDD[K, V](
finished = true
}
if (!finished) {
- inputMetrics.incRecordsReadInternal(1)
+ inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
@@ -291,7 +291,7 @@ class HadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.incBytesReadInternal(split.inputSplit.value.getLength)
+ inputMetrics.incBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index fb9606ae38..3ccd616cbf 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -189,7 +189,7 @@ class NewHadoopRDD[K, V](
}
havePair = false
if (!finished) {
- inputMetrics.incRecordsReadInternal(1)
+ inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
@@ -220,7 +220,7 @@ class NewHadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.incBytesReadInternal(split.serializableHadoopSplit.value.getLength)
+ inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 032939b49a..36ff3bcaae 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -333,10 +333,10 @@ abstract class RDD[T: ClassTag](
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
- existingMetrics.incBytesReadInternal(blockResult.bytes)
+ existingMetrics.incBytesRead(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
- existingMetrics.incRecordsReadInternal(1)
+ existingMetrics.incRecordsRead(1)
delegate.next()
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 3b78458065..558767e36f 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -813,8 +813,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
val readMethod = DataReadMethod.withName((inJson \ "Data Read Method").extract[String])
val inputMetrics = metrics.registerInputMetrics(readMethod)
- inputMetrics.incBytesReadInternal((inJson \ "Bytes Read").extract[Long])
- inputMetrics.incRecordsReadInternal((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
+ inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
+ inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
}
// Updated blocks
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index 088b05403c..d91f50f18f 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -285,8 +285,8 @@ class TaskMetricsSuite extends SparkFunSuite {
// set and increment values
in.setBytesRead(1L)
in.setBytesRead(2L)
- in.incRecordsReadInternal(1L)
- in.incRecordsReadInternal(2L)
+ in.incRecordsRead(1L)
+ in.incRecordsRead(2L)
in.setReadMethod(DataReadMethod.Disk)
// assert new values exist
assertValEquals(_.bytesRead, BYTES_READ, 2L)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6a2d4c9f2c..de6f408fa8 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -853,7 +853,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
if (hasHadoopInput) {
val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop)
inputMetrics.setBytesRead(d + e + f)
- inputMetrics.incRecordsReadInternal(if (hasRecords) (d + e + f) / 100 else -1)
+ inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
} else {
val sr = t.registerTempShuffleReadMetrics()
sr.incRemoteBytesRead(b + d)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 313bf93b5d..71f337ce1f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -627,7 +627,10 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.DistributedLDAModel.this")
) ++ Seq(
// [SPARK-14475] Propagate user-defined context from driver to executors
- ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.getLocalProperty")
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.getLocalProperty"),
+ // [SPARK-14617] Remove deprecated APIs in TaskMetrics
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.InputMetrics$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.OutputMetrics$")
)
case v if v.startsWith("1.6") =>
Seq(