aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-14 10:56:13 -0700
committerAndrew Or <andrew@databricks.com>2016-04-14 10:56:13 -0700
commita46f98d3f4ba6a79f4ef789806fec80a7d4f342d (patch)
tree5791bcb790685104288a1e51e097cdc8811ab1f6
parentdac40b68dc52d5ab855dfde63f0872064aa3d999 (diff)
downloadspark-a46f98d3f4ba6a79f4ef789806fec80a7d4f342d.tar.gz
spark-a46f98d3f4ba6a79f4ef789806fec80a7d4f342d.tar.bz2
spark-a46f98d3f4ba6a79f4ef789806fec80a7d4f342d.zip
[SPARK-14617] Remove deprecated APIs in TaskMetrics
## What changes were proposed in this pull request? This patch removes some of the deprecated APIs in TaskMetrics. This is part of my bigger effort to simplify accumulators and task metrics. ## How was this patch tested? N/A - only removals Author: Reynold Xin <rxin@databricks.com> Closes #12375 from rxin/SPARK-14617.
-rw-r--r--core/src/main/scala/org/apache/spark/executor/InputMetrics.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala2
-rw-r--r--project/MimaExcludes.scala5
11 files changed, 40 insertions, 97 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 6d30d3c76a..83e11c5e23 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -81,35 +81,9 @@ class InputMetrics private (
*/
def readMethod: DataReadMethod.Value = DataReadMethod.withName(_readMethod.localValue)
- // Once incBytesRead & intRecordsRead is ready to be removed from the public API
- // we can remove the internal versions and make the previous public API private.
- // This has been done to suppress warnings when building.
- @deprecated("incrementing input metrics is for internal use only", "2.0.0")
- def incBytesRead(v: Long): Unit = _bytesRead.add(v)
- private[spark] def incBytesReadInternal(v: Long): Unit = _bytesRead.add(v)
- @deprecated("incrementing input metrics is for internal use only", "2.0.0")
- def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
- private[spark] def incRecordsReadInternal(v: Long): Unit = _recordsRead.add(v)
+ private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
+ private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
- private[spark] def setReadMethod(v: DataReadMethod.Value): Unit =
- _readMethod.setValue(v.toString)
+ private[spark] def setReadMethod(v: DataReadMethod.Value): Unit = _readMethod.setValue(v.toString)
}
-
-/**
- * Deprecated methods to preserve case class matching behavior before Spark 2.0.
- */
-object InputMetrics {
-
- @deprecated("matching on InputMetrics will not be supported in the future", "2.0.0")
- def apply(readMethod: DataReadMethod.Value): InputMetrics = {
- val im = new InputMetrics
- im.setReadMethod(readMethod)
- im
- }
-
- @deprecated("matching on InputMetrics will not be supported in the future", "2.0.0")
- def unapply(input: InputMetrics): Option[DataReadMethod.Value] = {
- Some(input.readMethod)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
index 0b37d559c7..93f953846f 100644
--- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -52,18 +52,6 @@ class OutputMetrics private (
}
/**
- * Create a new [[OutputMetrics]] that is not associated with any particular task.
- *
- * This is only used for preserving matching behavior on [[OutputMetrics]], which used to be
- * a case class before Spark 2.0. Once we remove support for matching on [[OutputMetrics]]
- * we can remove this constructor as well.
- */
- private[executor] def this() {
- this(InternalAccumulator.createOutputAccums()
- .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]])
- }
-
- /**
* Total number of bytes written.
*/
def bytesWritten: Long = _bytesWritten.localValue
@@ -84,21 +72,3 @@ class OutputMetrics private (
_writeMethod.setValue(v.toString)
}
-
-/**
- * Deprecated methods to preserve case class matching behavior before Spark 2.0.
- */
-object OutputMetrics {
-
- @deprecated("matching on OutputMetrics will not be supported in the future", "2.0.0")
- def apply(writeMethod: DataWriteMethod.Value): OutputMetrics = {
- val om = new OutputMetrics
- om.setWriteMethod(writeMethod)
- om
- }
-
- @deprecated("matching on OutputMetrics will not be supported in the future", "2.0.0")
- def unapply(output: OutputMetrics): Option[DataWriteMethod.Value] = {
- Some(output.writeMethod)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index 50bb645d97..71a24770b5 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -116,4 +116,25 @@ class ShuffleReadMetrics private (
private[spark] def setFetchWaitTime(v: Long): Unit = _fetchWaitTime.setValue(v)
private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
+ /**
+ * Resets the value of the current metrics (`this`) and and merges all the independent
+ * [[ShuffleReadMetrics]] into `this`.
+ */
+ private[spark] def setMergeValues(metrics: Seq[ShuffleReadMetrics]): Unit = {
+ _remoteBlocksFetched.setValue(_remoteBlocksFetched.zero)
+ _localBlocksFetched.setValue(_localBlocksFetched.zero)
+ _remoteBytesRead.setValue(_remoteBytesRead.zero)
+ _localBytesRead.setValue(_localBytesRead.zero)
+ _fetchWaitTime.setValue(_fetchWaitTime.zero)
+ _recordsRead.setValue(_recordsRead.zero)
+ metrics.foreach { metric =>
+ _remoteBlocksFetched.add(metric.remoteBlocksFetched)
+ _localBlocksFetched.add(metric.localBlocksFetched)
+ _remoteBytesRead.add(metric.remoteBytesRead)
+ _localBytesRead.add(metric.localBytesRead)
+ _fetchWaitTime.add(metric.fetchWaitTime)
+ _recordsRead.add(metric.recordsRead)
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 02219a84ab..bda2a91d9d 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -139,16 +139,6 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
*/
def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses.localValue
- @deprecated("use updatedBlockStatuses instead", "2.0.0")
- def updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = {
- if (updatedBlockStatuses.nonEmpty) Some(updatedBlockStatuses) else None
- }
-
- @deprecated("setting updated blocks is not allowed", "2.0.0")
- def updatedBlocks_=(blocks: Option[Seq[(BlockId, BlockStatus)]]): Unit = {
- blocks.foreach(setUpdatedBlockStatuses)
- }
-
// Setters and increment-ers
private[spark] def setExecutorDeserializeTime(v: Long): Unit =
_executorDeserializeTime.setValue(v)
@@ -225,11 +215,6 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
*/
def outputMetrics: Option[OutputMetrics] = _outputMetrics
- @deprecated("setting OutputMetrics is for internal use only", "2.0.0")
- def outputMetrics_=(om: Option[OutputMetrics]): Unit = {
- _outputMetrics = om
- }
-
/**
* Get or create a new [[OutputMetrics]] associated with this task.
*/
@@ -285,12 +270,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
private[spark] def mergeShuffleReadMetrics(): Unit = synchronized {
if (tempShuffleReadMetrics.nonEmpty) {
val metrics = new ShuffleReadMetrics(initialAccumsMap)
- metrics.setRemoteBlocksFetched(tempShuffleReadMetrics.map(_.remoteBlocksFetched).sum)
- metrics.setLocalBlocksFetched(tempShuffleReadMetrics.map(_.localBlocksFetched).sum)
- metrics.setFetchWaitTime(tempShuffleReadMetrics.map(_.fetchWaitTime).sum)
- metrics.setRemoteBytesRead(tempShuffleReadMetrics.map(_.remoteBytesRead).sum)
- metrics.setLocalBytesRead(tempShuffleReadMetrics.map(_.localBytesRead).sum)
- metrics.setRecordsRead(tempShuffleReadMetrics.map(_.recordsRead).sum)
+ metrics.setMergeValues(tempShuffleReadMetrics)
_shuffleReadMetrics = Some(metrics)
}
}
@@ -306,11 +286,6 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
*/
def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics
- @deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0")
- def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = {
- _shuffleWriteMetrics = swm
- }
-
/**
* Get or create a new [[ShuffleWriteMetrics]] associated with this task.
*/
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index f7c646c668..35d190b464 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -259,7 +259,7 @@ class HadoopRDD[K, V](
finished = true
}
if (!finished) {
- inputMetrics.incRecordsReadInternal(1)
+ inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
@@ -291,7 +291,7 @@ class HadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.incBytesReadInternal(split.inputSplit.value.getLength)
+ inputMetrics.incBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index fb9606ae38..3ccd616cbf 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -189,7 +189,7 @@ class NewHadoopRDD[K, V](
}
havePair = false
if (!finished) {
- inputMetrics.incRecordsReadInternal(1)
+ inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
@@ -220,7 +220,7 @@ class NewHadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.incBytesReadInternal(split.serializableHadoopSplit.value.getLength)
+ inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 032939b49a..36ff3bcaae 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -333,10 +333,10 @@ abstract class RDD[T: ClassTag](
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
- existingMetrics.incBytesReadInternal(blockResult.bytes)
+ existingMetrics.incBytesRead(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
- existingMetrics.incRecordsReadInternal(1)
+ existingMetrics.incRecordsRead(1)
delegate.next()
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 3b78458065..558767e36f 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -813,8 +813,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
val readMethod = DataReadMethod.withName((inJson \ "Data Read Method").extract[String])
val inputMetrics = metrics.registerInputMetrics(readMethod)
- inputMetrics.incBytesReadInternal((inJson \ "Bytes Read").extract[Long])
- inputMetrics.incRecordsReadInternal((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
+ inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
+ inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
}
// Updated blocks
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index 088b05403c..d91f50f18f 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -285,8 +285,8 @@ class TaskMetricsSuite extends SparkFunSuite {
// set and increment values
in.setBytesRead(1L)
in.setBytesRead(2L)
- in.incRecordsReadInternal(1L)
- in.incRecordsReadInternal(2L)
+ in.incRecordsRead(1L)
+ in.incRecordsRead(2L)
in.setReadMethod(DataReadMethod.Disk)
// assert new values exist
assertValEquals(_.bytesRead, BYTES_READ, 2L)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6a2d4c9f2c..de6f408fa8 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -853,7 +853,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
if (hasHadoopInput) {
val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop)
inputMetrics.setBytesRead(d + e + f)
- inputMetrics.incRecordsReadInternal(if (hasRecords) (d + e + f) / 100 else -1)
+ inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
} else {
val sr = t.registerTempShuffleReadMetrics()
sr.incRemoteBytesRead(b + d)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 313bf93b5d..71f337ce1f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -627,7 +627,10 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.DistributedLDAModel.this")
) ++ Seq(
// [SPARK-14475] Propagate user-defined context from driver to executors
- ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.getLocalProperty")
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.getLocalProperty"),
+ // [SPARK-14617] Remove deprecated APIs in TaskMetrics
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.InputMetrics$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.OutputMetrics$")
)
case v if v.startsWith("1.6") =>
Seq(