aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-01-18 13:27:18 -0800
committerReynold Xin <rxin@databricks.com>2016-01-18 13:27:18 -0800
commit302bb569f3e1f09e2e7cea5e4e7f5c6953b0fc82 (patch)
treecc431e6dc190a213dc5633984a0dbf66323e6a19
parent5e492e9d5bc0992cbcffe64a9aaf3b334b173d2c (diff)
downloadspark-302bb569f3e1f09e2e7cea5e4e7f5c6953b0fc82.tar.gz
spark-302bb569f3e1f09e2e7cea5e4e7f5c6953b0fc82.tar.bz2
spark-302bb569f3e1f09e2e7cea5e4e7f5c6953b0fc82.zip
[SPARK-12884] Move classes to their own files for readability
This is a small step in implementing SPARK-10620, which migrates `TaskMetrics` to accumulators. This patch is strictly a cleanup patch and introduces no change in functionality. It literally just moves classes to their own files to avoid having single monolithic ones that contain 10 different classes. Parent PR: #10717 Author: Andrew Or <andrew@databricks.com> Closes #10810 from andrewor14/move-things.
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulable.scala (renamed from core/src/main/scala/org/apache/spark/Accumulators.scala)179
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulator.scala160
-rw-r--r--core/src/main/scala/org/apache/spark/InternalAccumulator.scala58
-rw-r--r--core/src/main/scala/org/apache/spark/executor/InputMetrics.scala77
-rw-r--r--core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala87
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala186
8 files changed, 493 insertions, 360 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala
index 5592b75afb..a456d420b8 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -20,14 +20,12 @@ package org.apache.spark
import java.io.{ObjectInputStream, Serializable}
import scala.collection.generic.Growable
-import scala.collection.Map
-import scala.collection.mutable
-import scala.ref.WeakReference
import scala.reflect.ClassTag
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils
+
/**
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
* but where the result type, `R`, may be different from the element type being added, `T`.
@@ -166,6 +164,7 @@ class Accumulable[R, T] private[spark] (
override def toString: String = if (value_ == null) "null" else value_.toString
}
+
/**
* Helper object defining how to accumulate values of a particular type. An implicit
* AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type.
@@ -201,6 +200,7 @@ trait AccumulableParam[R, T] extends Serializable {
def zero(initialValue: R): R
}
+
private[spark] class
GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
extends AccumulableParam[R, T] {
@@ -224,176 +224,3 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
copy
}
}
-
-/**
- * A simpler value of [[Accumulable]] where the result type being accumulated is the same
- * as the types of elements being merged, i.e. variables that are only "added" to through an
- * associative operation and can therefore be efficiently supported in parallel. They can be used
- * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
- * value types, and programmers can add support for new types.
- *
- * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
- * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
- * However, they cannot read its value. Only the driver program can read the accumulator's value,
- * using its value method.
- *
- * The interpreter session below shows an accumulator being used to add up the elements of an array:
- *
- * {{{
- * scala> val accum = sc.accumulator(0)
- * accum: spark.Accumulator[Int] = 0
- *
- * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
- * ...
- * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
- *
- * scala> accum.value
- * res2: Int = 10
- * }}}
- *
- * @param initialValue initial value of accumulator
- * @param param helper object defining how to add elements of type `T`
- * @tparam T result type
- */
-class Accumulator[T] private[spark] (
- @transient private[spark] val initialValue: T,
- param: AccumulatorParam[T],
- name: Option[String],
- internal: Boolean)
- extends Accumulable[T, T](initialValue, param, name, internal) {
-
- def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
- this(initialValue, param, name, false)
- }
-
- def this(initialValue: T, param: AccumulatorParam[T]) = {
- this(initialValue, param, None, false)
- }
-}
-
-/**
- * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
- * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
- * available when you create Accumulators of a specific type.
- *
- * @tparam T type of value to accumulate
- */
-trait AccumulatorParam[T] extends AccumulableParam[T, T] {
- def addAccumulator(t1: T, t2: T): T = {
- addInPlace(t1, t2)
- }
-}
-
-object AccumulatorParam {
-
- // The following implicit objects were in SparkContext before 1.2 and users had to
- // `import SparkContext._` to enable them. Now we move them here to make the compiler find
- // them automatically. However, as there are duplicate codes in SparkContext for backward
- // compatibility, please update them accordingly if you modify the following implicit objects.
-
- implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
- def addInPlace(t1: Double, t2: Double): Double = t1 + t2
- def zero(initialValue: Double): Double = 0.0
- }
-
- implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
- def addInPlace(t1: Int, t2: Int): Int = t1 + t2
- def zero(initialValue: Int): Int = 0
- }
-
- implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
- def addInPlace(t1: Long, t2: Long): Long = t1 + t2
- def zero(initialValue: Long): Long = 0L
- }
-
- implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
- def addInPlace(t1: Float, t2: Float): Float = t1 + t2
- def zero(initialValue: Float): Float = 0f
- }
-
- // TODO: Add AccumulatorParams for other types, e.g. lists and strings
-}
-
-// TODO: The multi-thread support in accumulators is kind of lame; check
-// if there's a more intuitive way of doing it right
-private[spark] object Accumulators extends Logging {
- /**
- * This global map holds the original accumulator objects that are created on the driver.
- * It keeps weak references to these objects so that accumulators can be garbage-collected
- * once the RDDs and user-code that reference them are cleaned up.
- */
- val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
-
- private var lastId: Long = 0
-
- def newId(): Long = synchronized {
- lastId += 1
- lastId
- }
-
- def register(a: Accumulable[_, _]): Unit = synchronized {
- originals(a.id) = new WeakReference[Accumulable[_, _]](a)
- }
-
- def remove(accId: Long) {
- synchronized {
- originals.remove(accId)
- }
- }
-
- // Add values to the original accumulators with some given IDs
- def add(values: Map[Long, Any]): Unit = synchronized {
- for ((id, value) <- values) {
- if (originals.contains(id)) {
- // Since we are now storing weak references, we must check whether the underlying data
- // is valid.
- originals(id).get match {
- case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
- case None =>
- throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
- }
- } else {
- logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
- }
- }
- }
-
-}
-
-private[spark] object InternalAccumulator {
- val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
- val TEST_ACCUMULATOR = "testAccumulator"
-
- // For testing only.
- // This needs to be a def since we don't want to reuse the same accumulator across stages.
- private def maybeTestAccumulator: Option[Accumulator[Long]] = {
- if (sys.props.contains("spark.testing")) {
- Some(new Accumulator(
- 0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
- } else {
- None
- }
- }
-
- /**
- * Accumulators for tracking internal metrics.
- *
- * These accumulators are created with the stage such that all tasks in the stage will
- * add to the same set of accumulators. We do this to report the distribution of accumulator
- * values across all tasks within each stage.
- */
- def create(sc: SparkContext): Seq[Accumulator[Long]] = {
- val internalAccumulators = Seq(
- // Execution memory refers to the memory used by internal data structures created
- // during shuffles, aggregations and joins. The value of this accumulator should be
- // approximately the sum of the peak sizes across all such data structures created
- // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
- new Accumulator(
- 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
- ) ++ maybeTestAccumulator.toSeq
- internalAccumulators.foreach { accumulator =>
- sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
- }
- internalAccumulators
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
new file mode 100644
index 0000000000..007136e6ae
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+
+
+/**
+ * A simpler value of [[Accumulable]] where the result type being accumulated is the same
+ * as the types of elements being merged, i.e. variables that are only "added" to through an
+ * associative operation and can therefore be efficiently supported in parallel. They can be used
+ * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
+ * value types, and programmers can add support for new types.
+ *
+ * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
+ * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
+ * However, they cannot read its value. Only the driver program can read the accumulator's value,
+ * using its value method.
+ *
+ * The interpreter session below shows an accumulator being used to add up the elements of an array:
+ *
+ * {{{
+ * scala> val accum = sc.accumulator(0)
+ * accum: spark.Accumulator[Int] = 0
+ *
+ * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
+ * ...
+ * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
+ *
+ * scala> accum.value
+ * res2: Int = 10
+ * }}}
+ *
+ * @param initialValue initial value of accumulator
+ * @param param helper object defining how to add elements of type `T`
+ * @tparam T result type
+ */
+class Accumulator[T] private[spark] (
+ @transient private[spark] val initialValue: T,
+ param: AccumulatorParam[T],
+ name: Option[String],
+ internal: Boolean)
+ extends Accumulable[T, T](initialValue, param, name, internal) {
+
+ def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
+ this(initialValue, param, name, false)
+ }
+
+ def this(initialValue: T, param: AccumulatorParam[T]) = {
+ this(initialValue, param, None, false)
+ }
+}
+
+
+// TODO: The multi-thread support in accumulators is kind of lame; check
+// if there's a more intuitive way of doing it right
+private[spark] object Accumulators extends Logging {
+ /**
+ * This global map holds the original accumulator objects that are created on the driver.
+ * It keeps weak references to these objects so that accumulators can be garbage-collected
+ * once the RDDs and user-code that reference them are cleaned up.
+ */
+ val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
+
+ private var lastId: Long = 0
+
+ def newId(): Long = synchronized {
+ lastId += 1
+ lastId
+ }
+
+ def register(a: Accumulable[_, _]): Unit = synchronized {
+ originals(a.id) = new WeakReference[Accumulable[_, _]](a)
+ }
+
+ def remove(accId: Long) {
+ synchronized {
+ originals.remove(accId)
+ }
+ }
+
+ // Add values to the original accumulators with some given IDs
+ def add(values: Map[Long, Any]): Unit = synchronized {
+ for ((id, value) <- values) {
+ if (originals.contains(id)) {
+ // Since we are now storing weak references, we must check whether the underlying data
+ // is valid.
+ originals(id).get match {
+ case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
+ case None =>
+ throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
+ }
+ } else {
+ logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
+ }
+ }
+ }
+
+}
+
+
+/**
+ * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
+ * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
+ * available when you create Accumulators of a specific type.
+ *
+ * @tparam T type of value to accumulate
+ */
+trait AccumulatorParam[T] extends AccumulableParam[T, T] {
+ def addAccumulator(t1: T, t2: T): T = {
+ addInPlace(t1, t2)
+ }
+}
+
+
+object AccumulatorParam {
+
+ // The following implicit objects were in SparkContext before 1.2 and users had to
+ // `import SparkContext._` to enable them. Now we move them here to make the compiler find
+ // them automatically. However, as there are duplicate codes in SparkContext for backward
+ // compatibility, please update them accordingly if you modify the following implicit objects.
+
+ implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
+ def addInPlace(t1: Double, t2: Double): Double = t1 + t2
+ def zero(initialValue: Double): Double = 0.0
+ }
+
+ implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
+ def addInPlace(t1: Int, t2: Int): Int = t1 + t2
+ def zero(initialValue: Int): Int = 0
+ }
+
+ implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
+ def addInPlace(t1: Long, t2: Long): Long = t1 + t2
+ def zero(initialValue: Long): Long = 0L
+ }
+
+ implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
+ def addInPlace(t1: Float, t2: Float): Float = t1 + t2
+ def zero(initialValue: Float): Float = 0f
+ }
+
+ // TODO: Add AccumulatorParams for other types, e.g. lists and strings
+}
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
new file mode 100644
index 0000000000..6ea997c079
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+
+// This is moved to its own file because many more things will be added to it in SPARK-10620.
+private[spark] object InternalAccumulator {
+ val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
+ val TEST_ACCUMULATOR = "testAccumulator"
+
+ // For testing only.
+ // This needs to be a def since we don't want to reuse the same accumulator across stages.
+ private def maybeTestAccumulator: Option[Accumulator[Long]] = {
+ if (sys.props.contains("spark.testing")) {
+ Some(new Accumulator(
+ 0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Accumulators for tracking internal metrics.
+ *
+ * These accumulators are created with the stage such that all tasks in the stage will
+ * add to the same set of accumulators. We do this to report the distribution of accumulator
+ * values across all tasks within each stage.
+ */
+ def create(sc: SparkContext): Seq[Accumulator[Long]] = {
+ val internalAccumulators = Seq(
+ // Execution memory refers to the memory used by internal data structures created
+ // during shuffles, aggregations and joins. The value of this accumulator should be
+ // approximately the sum of the peak sizes across all such data structures created
+ // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
+ new Accumulator(
+ 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
+ ) ++ maybeTestAccumulator.toSeq
+ internalAccumulators.foreach { accumulator =>
+ sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
+ }
+ internalAccumulators
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
new file mode 100644
index 0000000000..8f1d7f89a4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * Method by which input data was read. Network means that the data was read over the network
+ * from a remote block manager (which may have stored the data on-disk or in-memory).
+ */
+@DeveloperApi
+object DataReadMethod extends Enumeration with Serializable {
+ type DataReadMethod = Value
+ val Memory, Disk, Hadoop, Network = Value
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * Metrics about reading input data.
+ */
+@DeveloperApi
+case class InputMetrics(readMethod: DataReadMethod.Value) {
+
+ /**
+ * This is volatile so that it is visible to the updater thread.
+ */
+ @volatile @transient var bytesReadCallback: Option[() => Long] = None
+
+ /**
+ * Total bytes read.
+ */
+ private var _bytesRead: Long = _
+ def bytesRead: Long = _bytesRead
+ def incBytesRead(bytes: Long): Unit = _bytesRead += bytes
+
+ /**
+ * Total records read.
+ */
+ private var _recordsRead: Long = _
+ def recordsRead: Long = _recordsRead
+ def incRecordsRead(records: Long): Unit = _recordsRead += records
+
+ /**
+ * Invoke the bytesReadCallback and mutate bytesRead.
+ */
+ def updateBytesRead() {
+ bytesReadCallback.foreach { c =>
+ _bytesRead = c()
+ }
+ }
+
+ /**
+ * Register a function that can be called to get up-to-date information on how many bytes the task
+ * has read from an input source.
+ */
+ def setBytesReadCallback(f: Option[() => Long]) {
+ bytesReadCallback = f
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
new file mode 100644
index 0000000000..ad132d004c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * Method by which output data was written.
+ */
+@DeveloperApi
+object DataWriteMethod extends Enumeration with Serializable {
+ type DataWriteMethod = Value
+ val Hadoop = Value
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * Metrics about writing output data.
+ */
+@DeveloperApi
+case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
+ /**
+ * Total bytes written
+ */
+ private var _bytesWritten: Long = _
+ def bytesWritten: Long = _bytesWritten
+ private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value
+
+ /**
+ * Total records written
+ */
+ private var _recordsWritten: Long = 0L
+ def recordsWritten: Long = _recordsWritten
+ private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value
+}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
new file mode 100644
index 0000000000..e985b35ace
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * Metrics pertaining to shuffle data read in a given task.
+ */
+@DeveloperApi
+class ShuffleReadMetrics extends Serializable {
+ /**
+ * Number of remote blocks fetched in this shuffle by this task
+ */
+ private var _remoteBlocksFetched: Int = _
+ def remoteBlocksFetched: Int = _remoteBlocksFetched
+ private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
+ private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
+
+ /**
+ * Number of local blocks fetched in this shuffle by this task
+ */
+ private var _localBlocksFetched: Int = _
+ def localBlocksFetched: Int = _localBlocksFetched
+ private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
+ private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
+
+ /**
+ * Time the task spent waiting for remote shuffle blocks. This only includes the time
+ * blocking on shuffle input data. For instance if block B is being fetched while the task is
+ * still not finished processing block A, it is not considered to be blocking on block B.
+ */
+ private var _fetchWaitTime: Long = _
+ def fetchWaitTime: Long = _fetchWaitTime
+ private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
+ private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
+
+ /**
+ * Total number of remote bytes read from the shuffle by this task
+ */
+ private var _remoteBytesRead: Long = _
+ def remoteBytesRead: Long = _remoteBytesRead
+ private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
+ private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
+
+ /**
+ * Shuffle data that was read from the local disk (as opposed to from a remote executor).
+ */
+ private var _localBytesRead: Long = _
+ def localBytesRead: Long = _localBytesRead
+ private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
+
+ /**
+ * Total bytes fetched in the shuffle by this task (both remote and local).
+ */
+ def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
+
+ /**
+ * Number of blocks fetched in this shuffle by this task (remote or local)
+ */
+ def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
+
+ /**
+ * Total number of records read from the shuffle by this task
+ */
+ private var _recordsRead: Long = _
+ def recordsRead: Long = _recordsRead
+ private[spark] def incRecordsRead(value: Long) = _recordsRead += value
+ private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
+}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
new file mode 100644
index 0000000000..469ebe26c7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * Metrics pertaining to shuffle data written in a given task.
+ */
+@DeveloperApi
+class ShuffleWriteMetrics extends Serializable {
+ /**
+ * Number of bytes written for the shuffle by this task
+ */
+ @volatile private var _shuffleBytesWritten: Long = _
+ def shuffleBytesWritten: Long = _shuffleBytesWritten
+ private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
+ private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
+
+ /**
+ * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
+ */
+ @volatile private var _shuffleWriteTime: Long = _
+ def shuffleWriteTime: Long = _shuffleWriteTime
+ private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
+ private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
+
+ /**
+ * Total number of records written to the shuffle by this task
+ */
+ @volatile private var _shuffleRecordsWritten: Long = _
+ def shuffleRecordsWritten: Long = _shuffleRecordsWritten
+ private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
+ private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
+ private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
+}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 42207a9553..ce1fcbff71 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -27,6 +27,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.util.Utils
+
/**
* :: DeveloperApi ::
* Metrics tracked during the execution of a task.
@@ -241,6 +242,7 @@ class TaskMetrics extends Serializable {
}
}
+
private[spark] object TaskMetrics {
private val hostNameCache = new ConcurrentHashMap[String, String]()
@@ -251,187 +253,3 @@ private[spark] object TaskMetrics {
if (canonicalHost != null) canonicalHost else host
}
}
-
-/**
- * :: DeveloperApi ::
- * Method by which input data was read. Network means that the data was read over the network
- * from a remote block manager (which may have stored the data on-disk or in-memory).
- */
-@DeveloperApi
-object DataReadMethod extends Enumeration with Serializable {
- type DataReadMethod = Value
- val Memory, Disk, Hadoop, Network = Value
-}
-
-/**
- * :: DeveloperApi ::
- * Method by which output data was written.
- */
-@DeveloperApi
-object DataWriteMethod extends Enumeration with Serializable {
- type DataWriteMethod = Value
- val Hadoop = Value
-}
-
-/**
- * :: DeveloperApi ::
- * Metrics about reading input data.
- */
-@DeveloperApi
-case class InputMetrics(readMethod: DataReadMethod.Value) {
-
- /**
- * This is volatile so that it is visible to the updater thread.
- */
- @volatile @transient var bytesReadCallback: Option[() => Long] = None
-
- /**
- * Total bytes read.
- */
- private var _bytesRead: Long = _
- def bytesRead: Long = _bytesRead
- def incBytesRead(bytes: Long): Unit = _bytesRead += bytes
-
- /**
- * Total records read.
- */
- private var _recordsRead: Long = _
- def recordsRead: Long = _recordsRead
- def incRecordsRead(records: Long): Unit = _recordsRead += records
-
- /**
- * Invoke the bytesReadCallback and mutate bytesRead.
- */
- def updateBytesRead() {
- bytesReadCallback.foreach { c =>
- _bytesRead = c()
- }
- }
-
- /**
- * Register a function that can be called to get up-to-date information on how many bytes the task
- * has read from an input source.
- */
- def setBytesReadCallback(f: Option[() => Long]) {
- bytesReadCallback = f
- }
-}
-
-/**
- * :: DeveloperApi ::
- * Metrics about writing output data.
- */
-@DeveloperApi
-case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
- /**
- * Total bytes written
- */
- private var _bytesWritten: Long = _
- def bytesWritten: Long = _bytesWritten
- private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value
-
- /**
- * Total records written
- */
- private var _recordsWritten: Long = 0L
- def recordsWritten: Long = _recordsWritten
- private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value
-}
-
-/**
- * :: DeveloperApi ::
- * Metrics pertaining to shuffle data read in a given task.
- */
-@DeveloperApi
-class ShuffleReadMetrics extends Serializable {
- /**
- * Number of remote blocks fetched in this shuffle by this task
- */
- private var _remoteBlocksFetched: Int = _
- def remoteBlocksFetched: Int = _remoteBlocksFetched
- private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
- private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
-
- /**
- * Number of local blocks fetched in this shuffle by this task
- */
- private var _localBlocksFetched: Int = _
- def localBlocksFetched: Int = _localBlocksFetched
- private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
- private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
-
- /**
- * Time the task spent waiting for remote shuffle blocks. This only includes the time
- * blocking on shuffle input data. For instance if block B is being fetched while the task is
- * still not finished processing block A, it is not considered to be blocking on block B.
- */
- private var _fetchWaitTime: Long = _
- def fetchWaitTime: Long = _fetchWaitTime
- private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
- private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
-
- /**
- * Total number of remote bytes read from the shuffle by this task
- */
- private var _remoteBytesRead: Long = _
- def remoteBytesRead: Long = _remoteBytesRead
- private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
- private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
-
- /**
- * Shuffle data that was read from the local disk (as opposed to from a remote executor).
- */
- private var _localBytesRead: Long = _
- def localBytesRead: Long = _localBytesRead
- private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
-
- /**
- * Total bytes fetched in the shuffle by this task (both remote and local).
- */
- def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
-
- /**
- * Number of blocks fetched in this shuffle by this task (remote or local)
- */
- def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
-
- /**
- * Total number of records read from the shuffle by this task
- */
- private var _recordsRead: Long = _
- def recordsRead: Long = _recordsRead
- private[spark] def incRecordsRead(value: Long) = _recordsRead += value
- private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
-}
-
-/**
- * :: DeveloperApi ::
- * Metrics pertaining to shuffle data written in a given task.
- */
-@DeveloperApi
-class ShuffleWriteMetrics extends Serializable {
- /**
- * Number of bytes written for the shuffle by this task
- */
- @volatile private var _shuffleBytesWritten: Long = _
- def shuffleBytesWritten: Long = _shuffleBytesWritten
- private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
- private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
-
- /**
- * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
- */
- @volatile private var _shuffleWriteTime: Long = _
- def shuffleWriteTime: Long = _shuffleWriteTime
- private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
- private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
-
- /**
- * Total number of records written to the shuffle by this task
- */
- @volatile private var _shuffleRecordsWritten: Long = _
- def shuffleRecordsWritten: Long = _shuffleRecordsWritten
- private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
- private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
- private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
-}