diff options
author | Andrew Or <andrew@databricks.com> | 2016-01-18 13:27:18 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-01-18 13:27:18 -0800 |
commit | 302bb569f3e1f09e2e7cea5e4e7f5c6953b0fc82 (patch) | |
tree | cc431e6dc190a213dc5633984a0dbf66323e6a19 /core | |
parent | 5e492e9d5bc0992cbcffe64a9aaf3b334b173d2c (diff) | |
download | spark-302bb569f3e1f09e2e7cea5e4e7f5c6953b0fc82.tar.gz spark-302bb569f3e1f09e2e7cea5e4e7f5c6953b0fc82.tar.bz2 spark-302bb569f3e1f09e2e7cea5e4e7f5c6953b0fc82.zip |
[SPARK-12884] Move classes to their own files for readability
This is a small step in implementing SPARK-10620, which migrates `TaskMetrics` to accumulators. This patch is strictly a cleanup patch and introduces no change in functionality. It literally just moves classes to their own files to avoid having single monolithic ones that contain 10 different classes.
Parent PR: #10717
Author: Andrew Or <andrew@databricks.com>
Closes #10810 from andrewor14/move-things.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/Accumulable.scala (renamed from core/src/main/scala/org/apache/spark/Accumulators.scala) | 179 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/Accumulator.scala | 160 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/InternalAccumulator.scala | 58 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/InputMetrics.scala | 77 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala | 53 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala | 87 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala | 53 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala | 186 |
8 files changed, 493 insertions, 360 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala index 5592b75afb..a456d420b8 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulable.scala @@ -20,14 +20,12 @@ package org.apache.spark import java.io.{ObjectInputStream, Serializable} import scala.collection.generic.Growable -import scala.collection.Map -import scala.collection.mutable -import scala.ref.WeakReference import scala.reflect.ClassTag import org.apache.spark.serializer.JavaSerializer import org.apache.spark.util.Utils + /** * A data type that can be accumulated, ie has an commutative and associative "add" operation, * but where the result type, `R`, may be different from the element type being added, `T`. @@ -166,6 +164,7 @@ class Accumulable[R, T] private[spark] ( override def toString: String = if (value_ == null) "null" else value_.toString } + /** * Helper object defining how to accumulate values of a particular type. An implicit * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type. @@ -201,6 +200,7 @@ trait AccumulableParam[R, T] extends Serializable { def zero(initialValue: R): R } + private[spark] class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T] extends AccumulableParam[R, T] { @@ -224,176 +224,3 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa copy } } - -/** - * A simpler value of [[Accumulable]] where the result type being accumulated is the same - * as the types of elements being merged, i.e. variables that are only "added" to through an - * associative operation and can therefore be efficiently supported in parallel. They can be used - * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric - * value types, and programmers can add support for new types. - * - * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]]. - * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator. - * However, they cannot read its value. Only the driver program can read the accumulator's value, - * using its value method. - * - * The interpreter session below shows an accumulator being used to add up the elements of an array: - * - * {{{ - * scala> val accum = sc.accumulator(0) - * accum: spark.Accumulator[Int] = 0 - * - * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) - * ... - * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s - * - * scala> accum.value - * res2: Int = 10 - * }}} - * - * @param initialValue initial value of accumulator - * @param param helper object defining how to add elements of type `T` - * @tparam T result type - */ -class Accumulator[T] private[spark] ( - @transient private[spark] val initialValue: T, - param: AccumulatorParam[T], - name: Option[String], - internal: Boolean) - extends Accumulable[T, T](initialValue, param, name, internal) { - - def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = { - this(initialValue, param, name, false) - } - - def this(initialValue: T, param: AccumulatorParam[T]) = { - this(initialValue, param, None, false) - } -} - -/** - * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add - * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be - * available when you create Accumulators of a specific type. - * - * @tparam T type of value to accumulate - */ -trait AccumulatorParam[T] extends AccumulableParam[T, T] { - def addAccumulator(t1: T, t2: T): T = { - addInPlace(t1, t2) - } -} - -object AccumulatorParam { - - // The following implicit objects were in SparkContext before 1.2 and users had to - // `import SparkContext._` to enable them. Now we move them here to make the compiler find - // them automatically. However, as there are duplicate codes in SparkContext for backward - // compatibility, please update them accordingly if you modify the following implicit objects. - - implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { - def addInPlace(t1: Double, t2: Double): Double = t1 + t2 - def zero(initialValue: Double): Double = 0.0 - } - - implicit object IntAccumulatorParam extends AccumulatorParam[Int] { - def addInPlace(t1: Int, t2: Int): Int = t1 + t2 - def zero(initialValue: Int): Int = 0 - } - - implicit object LongAccumulatorParam extends AccumulatorParam[Long] { - def addInPlace(t1: Long, t2: Long): Long = t1 + t2 - def zero(initialValue: Long): Long = 0L - } - - implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { - def addInPlace(t1: Float, t2: Float): Float = t1 + t2 - def zero(initialValue: Float): Float = 0f - } - - // TODO: Add AccumulatorParams for other types, e.g. lists and strings -} - -// TODO: The multi-thread support in accumulators is kind of lame; check -// if there's a more intuitive way of doing it right -private[spark] object Accumulators extends Logging { - /** - * This global map holds the original accumulator objects that are created on the driver. - * It keeps weak references to these objects so that accumulators can be garbage-collected - * once the RDDs and user-code that reference them are cleaned up. - */ - val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]() - - private var lastId: Long = 0 - - def newId(): Long = synchronized { - lastId += 1 - lastId - } - - def register(a: Accumulable[_, _]): Unit = synchronized { - originals(a.id) = new WeakReference[Accumulable[_, _]](a) - } - - def remove(accId: Long) { - synchronized { - originals.remove(accId) - } - } - - // Add values to the original accumulators with some given IDs - def add(values: Map[Long, Any]): Unit = synchronized { - for ((id, value) <- values) { - if (originals.contains(id)) { - // Since we are now storing weak references, we must check whether the underlying data - // is valid. - originals(id).get match { - case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value - case None => - throw new IllegalAccessError("Attempted to access garbage collected Accumulator.") - } - } else { - logWarning(s"Ignoring accumulator update for unknown accumulator id $id") - } - } - } - -} - -private[spark] object InternalAccumulator { - val PEAK_EXECUTION_MEMORY = "peakExecutionMemory" - val TEST_ACCUMULATOR = "testAccumulator" - - // For testing only. - // This needs to be a def since we don't want to reuse the same accumulator across stages. - private def maybeTestAccumulator: Option[Accumulator[Long]] = { - if (sys.props.contains("spark.testing")) { - Some(new Accumulator( - 0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true)) - } else { - None - } - } - - /** - * Accumulators for tracking internal metrics. - * - * These accumulators are created with the stage such that all tasks in the stage will - * add to the same set of accumulators. We do this to report the distribution of accumulator - * values across all tasks within each stage. - */ - def create(sc: SparkContext): Seq[Accumulator[Long]] = { - val internalAccumulators = Seq( - // Execution memory refers to the memory used by internal data structures created - // during shuffles, aggregations and joins. The value of this accumulator should be - // approximately the sum of the peak sizes across all such data structures created - // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort. - new Accumulator( - 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true) - ) ++ maybeTestAccumulator.toSeq - internalAccumulators.foreach { accumulator => - sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator)) - } - internalAccumulators - } -} diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala new file mode 100644 index 0000000000..007136e6ae --- /dev/null +++ b/core/src/main/scala/org/apache/spark/Accumulator.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.{mutable, Map} +import scala.ref.WeakReference + + +/** + * A simpler value of [[Accumulable]] where the result type being accumulated is the same + * as the types of elements being merged, i.e. variables that are only "added" to through an + * associative operation and can therefore be efficiently supported in parallel. They can be used + * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric + * value types, and programmers can add support for new types. + * + * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]]. + * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator. + * However, they cannot read its value. Only the driver program can read the accumulator's value, + * using its value method. + * + * The interpreter session below shows an accumulator being used to add up the elements of an array: + * + * {{{ + * scala> val accum = sc.accumulator(0) + * accum: spark.Accumulator[Int] = 0 + * + * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) + * ... + * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s + * + * scala> accum.value + * res2: Int = 10 + * }}} + * + * @param initialValue initial value of accumulator + * @param param helper object defining how to add elements of type `T` + * @tparam T result type + */ +class Accumulator[T] private[spark] ( + @transient private[spark] val initialValue: T, + param: AccumulatorParam[T], + name: Option[String], + internal: Boolean) + extends Accumulable[T, T](initialValue, param, name, internal) { + + def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = { + this(initialValue, param, name, false) + } + + def this(initialValue: T, param: AccumulatorParam[T]) = { + this(initialValue, param, None, false) + } +} + + +// TODO: The multi-thread support in accumulators is kind of lame; check +// if there's a more intuitive way of doing it right +private[spark] object Accumulators extends Logging { + /** + * This global map holds the original accumulator objects that are created on the driver. + * It keeps weak references to these objects so that accumulators can be garbage-collected + * once the RDDs and user-code that reference them are cleaned up. + */ + val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]() + + private var lastId: Long = 0 + + def newId(): Long = synchronized { + lastId += 1 + lastId + } + + def register(a: Accumulable[_, _]): Unit = synchronized { + originals(a.id) = new WeakReference[Accumulable[_, _]](a) + } + + def remove(accId: Long) { + synchronized { + originals.remove(accId) + } + } + + // Add values to the original accumulators with some given IDs + def add(values: Map[Long, Any]): Unit = synchronized { + for ((id, value) <- values) { + if (originals.contains(id)) { + // Since we are now storing weak references, we must check whether the underlying data + // is valid. + originals(id).get match { + case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value + case None => + throw new IllegalAccessError("Attempted to access garbage collected Accumulator.") + } + } else { + logWarning(s"Ignoring accumulator update for unknown accumulator id $id") + } + } + } + +} + + +/** + * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add + * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be + * available when you create Accumulators of a specific type. + * + * @tparam T type of value to accumulate + */ +trait AccumulatorParam[T] extends AccumulableParam[T, T] { + def addAccumulator(t1: T, t2: T): T = { + addInPlace(t1, t2) + } +} + + +object AccumulatorParam { + + // The following implicit objects were in SparkContext before 1.2 and users had to + // `import SparkContext._` to enable them. Now we move them here to make the compiler find + // them automatically. However, as there are duplicate codes in SparkContext for backward + // compatibility, please update them accordingly if you modify the following implicit objects. + + implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { + def addInPlace(t1: Double, t2: Double): Double = t1 + t2 + def zero(initialValue: Double): Double = 0.0 + } + + implicit object IntAccumulatorParam extends AccumulatorParam[Int] { + def addInPlace(t1: Int, t2: Int): Int = t1 + t2 + def zero(initialValue: Int): Int = 0 + } + + implicit object LongAccumulatorParam extends AccumulatorParam[Long] { + def addInPlace(t1: Long, t2: Long): Long = t1 + t2 + def zero(initialValue: Long): Long = 0L + } + + implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { + def addInPlace(t1: Float, t2: Float): Float = t1 + t2 + def zero(initialValue: Float): Float = 0f + } + + // TODO: Add AccumulatorParams for other types, e.g. lists and strings +} diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala new file mode 100644 index 0000000000..6ea997c079 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + + +// This is moved to its own file because many more things will be added to it in SPARK-10620. +private[spark] object InternalAccumulator { + val PEAK_EXECUTION_MEMORY = "peakExecutionMemory" + val TEST_ACCUMULATOR = "testAccumulator" + + // For testing only. + // This needs to be a def since we don't want to reuse the same accumulator across stages. + private def maybeTestAccumulator: Option[Accumulator[Long]] = { + if (sys.props.contains("spark.testing")) { + Some(new Accumulator( + 0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true)) + } else { + None + } + } + + /** + * Accumulators for tracking internal metrics. + * + * These accumulators are created with the stage such that all tasks in the stage will + * add to the same set of accumulators. We do this to report the distribution of accumulator + * values across all tasks within each stage. + */ + def create(sc: SparkContext): Seq[Accumulator[Long]] = { + val internalAccumulators = Seq( + // Execution memory refers to the memory used by internal data structures created + // during shuffles, aggregations and joins. The value of this accumulator should be + // approximately the sum of the peak sizes across all such data structures created + // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort. + new Accumulator( + 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true) + ) ++ maybeTestAccumulator.toSeq + internalAccumulators.foreach { accumulator => + sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator)) + } + internalAccumulators + } +} diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala new file mode 100644 index 0000000000..8f1d7f89a4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi + + +/** + * :: DeveloperApi :: + * Method by which input data was read. Network means that the data was read over the network + * from a remote block manager (which may have stored the data on-disk or in-memory). + */ +@DeveloperApi +object DataReadMethod extends Enumeration with Serializable { + type DataReadMethod = Value + val Memory, Disk, Hadoop, Network = Value +} + + +/** + * :: DeveloperApi :: + * Metrics about reading input data. + */ +@DeveloperApi +case class InputMetrics(readMethod: DataReadMethod.Value) { + + /** + * This is volatile so that it is visible to the updater thread. + */ + @volatile @transient var bytesReadCallback: Option[() => Long] = None + + /** + * Total bytes read. + */ + private var _bytesRead: Long = _ + def bytesRead: Long = _bytesRead + def incBytesRead(bytes: Long): Unit = _bytesRead += bytes + + /** + * Total records read. + */ + private var _recordsRead: Long = _ + def recordsRead: Long = _recordsRead + def incRecordsRead(records: Long): Unit = _recordsRead += records + + /** + * Invoke the bytesReadCallback and mutate bytesRead. + */ + def updateBytesRead() { + bytesReadCallback.foreach { c => + _bytesRead = c() + } + } + + /** + * Register a function that can be called to get up-to-date information on how many bytes the task + * has read from an input source. + */ + def setBytesReadCallback(f: Option[() => Long]) { + bytesReadCallback = f + } +} diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala new file mode 100644 index 0000000000..ad132d004c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi + + +/** + * :: DeveloperApi :: + * Method by which output data was written. + */ +@DeveloperApi +object DataWriteMethod extends Enumeration with Serializable { + type DataWriteMethod = Value + val Hadoop = Value +} + + +/** + * :: DeveloperApi :: + * Metrics about writing output data. + */ +@DeveloperApi +case class OutputMetrics(writeMethod: DataWriteMethod.Value) { + /** + * Total bytes written + */ + private var _bytesWritten: Long = _ + def bytesWritten: Long = _bytesWritten + private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value + + /** + * Total records written + */ + private var _recordsWritten: Long = 0L + def recordsWritten: Long = _recordsWritten + private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value +} diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala new file mode 100644 index 0000000000..e985b35ace --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi + + +/** + * :: DeveloperApi :: + * Metrics pertaining to shuffle data read in a given task. + */ +@DeveloperApi +class ShuffleReadMetrics extends Serializable { + /** + * Number of remote blocks fetched in this shuffle by this task + */ + private var _remoteBlocksFetched: Int = _ + def remoteBlocksFetched: Int = _remoteBlocksFetched + private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value + private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value + + /** + * Number of local blocks fetched in this shuffle by this task + */ + private var _localBlocksFetched: Int = _ + def localBlocksFetched: Int = _localBlocksFetched + private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value + private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value + + /** + * Time the task spent waiting for remote shuffle blocks. This only includes the time + * blocking on shuffle input data. For instance if block B is being fetched while the task is + * still not finished processing block A, it is not considered to be blocking on block B. + */ + private var _fetchWaitTime: Long = _ + def fetchWaitTime: Long = _fetchWaitTime + private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value + private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value + + /** + * Total number of remote bytes read from the shuffle by this task + */ + private var _remoteBytesRead: Long = _ + def remoteBytesRead: Long = _remoteBytesRead + private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value + private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value + + /** + * Shuffle data that was read from the local disk (as opposed to from a remote executor). + */ + private var _localBytesRead: Long = _ + def localBytesRead: Long = _localBytesRead + private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value + + /** + * Total bytes fetched in the shuffle by this task (both remote and local). + */ + def totalBytesRead: Long = _remoteBytesRead + _localBytesRead + + /** + * Number of blocks fetched in this shuffle by this task (remote or local) + */ + def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched + + /** + * Total number of records read from the shuffle by this task + */ + private var _recordsRead: Long = _ + def recordsRead: Long = _recordsRead + private[spark] def incRecordsRead(value: Long) = _recordsRead += value + private[spark] def decRecordsRead(value: Long) = _recordsRead -= value +} diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala new file mode 100644 index 0000000000..469ebe26c7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi + + +/** + * :: DeveloperApi :: + * Metrics pertaining to shuffle data written in a given task. + */ +@DeveloperApi +class ShuffleWriteMetrics extends Serializable { + /** + * Number of bytes written for the shuffle by this task + */ + @volatile private var _shuffleBytesWritten: Long = _ + def shuffleBytesWritten: Long = _shuffleBytesWritten + private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value + private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value + + /** + * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds + */ + @volatile private var _shuffleWriteTime: Long = _ + def shuffleWriteTime: Long = _shuffleWriteTime + private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value + private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value + + /** + * Total number of records written to the shuffle by this task + */ + @volatile private var _shuffleRecordsWritten: Long = _ + def shuffleRecordsWritten: Long = _shuffleRecordsWritten + private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value + private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value + private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value +} diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 42207a9553..ce1fcbff71 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -27,6 +27,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod import org.apache.spark.storage.{BlockId, BlockStatus} import org.apache.spark.util.Utils + /** * :: DeveloperApi :: * Metrics tracked during the execution of a task. @@ -241,6 +242,7 @@ class TaskMetrics extends Serializable { } } + private[spark] object TaskMetrics { private val hostNameCache = new ConcurrentHashMap[String, String]() @@ -251,187 +253,3 @@ private[spark] object TaskMetrics { if (canonicalHost != null) canonicalHost else host } } - -/** - * :: DeveloperApi :: - * Method by which input data was read. Network means that the data was read over the network - * from a remote block manager (which may have stored the data on-disk or in-memory). - */ -@DeveloperApi -object DataReadMethod extends Enumeration with Serializable { - type DataReadMethod = Value - val Memory, Disk, Hadoop, Network = Value -} - -/** - * :: DeveloperApi :: - * Method by which output data was written. - */ -@DeveloperApi -object DataWriteMethod extends Enumeration with Serializable { - type DataWriteMethod = Value - val Hadoop = Value -} - -/** - * :: DeveloperApi :: - * Metrics about reading input data. - */ -@DeveloperApi -case class InputMetrics(readMethod: DataReadMethod.Value) { - - /** - * This is volatile so that it is visible to the updater thread. - */ - @volatile @transient var bytesReadCallback: Option[() => Long] = None - - /** - * Total bytes read. - */ - private var _bytesRead: Long = _ - def bytesRead: Long = _bytesRead - def incBytesRead(bytes: Long): Unit = _bytesRead += bytes - - /** - * Total records read. - */ - private var _recordsRead: Long = _ - def recordsRead: Long = _recordsRead - def incRecordsRead(records: Long): Unit = _recordsRead += records - - /** - * Invoke the bytesReadCallback and mutate bytesRead. - */ - def updateBytesRead() { - bytesReadCallback.foreach { c => - _bytesRead = c() - } - } - - /** - * Register a function that can be called to get up-to-date information on how many bytes the task - * has read from an input source. - */ - def setBytesReadCallback(f: Option[() => Long]) { - bytesReadCallback = f - } -} - -/** - * :: DeveloperApi :: - * Metrics about writing output data. - */ -@DeveloperApi -case class OutputMetrics(writeMethod: DataWriteMethod.Value) { - /** - * Total bytes written - */ - private var _bytesWritten: Long = _ - def bytesWritten: Long = _bytesWritten - private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value - - /** - * Total records written - */ - private var _recordsWritten: Long = 0L - def recordsWritten: Long = _recordsWritten - private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value -} - -/** - * :: DeveloperApi :: - * Metrics pertaining to shuffle data read in a given task. - */ -@DeveloperApi -class ShuffleReadMetrics extends Serializable { - /** - * Number of remote blocks fetched in this shuffle by this task - */ - private var _remoteBlocksFetched: Int = _ - def remoteBlocksFetched: Int = _remoteBlocksFetched - private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value - private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value - - /** - * Number of local blocks fetched in this shuffle by this task - */ - private var _localBlocksFetched: Int = _ - def localBlocksFetched: Int = _localBlocksFetched - private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value - private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value - - /** - * Time the task spent waiting for remote shuffle blocks. This only includes the time - * blocking on shuffle input data. For instance if block B is being fetched while the task is - * still not finished processing block A, it is not considered to be blocking on block B. - */ - private var _fetchWaitTime: Long = _ - def fetchWaitTime: Long = _fetchWaitTime - private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value - private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value - - /** - * Total number of remote bytes read from the shuffle by this task - */ - private var _remoteBytesRead: Long = _ - def remoteBytesRead: Long = _remoteBytesRead - private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value - private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value - - /** - * Shuffle data that was read from the local disk (as opposed to from a remote executor). - */ - private var _localBytesRead: Long = _ - def localBytesRead: Long = _localBytesRead - private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value - - /** - * Total bytes fetched in the shuffle by this task (both remote and local). - */ - def totalBytesRead: Long = _remoteBytesRead + _localBytesRead - - /** - * Number of blocks fetched in this shuffle by this task (remote or local) - */ - def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched - - /** - * Total number of records read from the shuffle by this task - */ - private var _recordsRead: Long = _ - def recordsRead: Long = _recordsRead - private[spark] def incRecordsRead(value: Long) = _recordsRead += value - private[spark] def decRecordsRead(value: Long) = _recordsRead -= value -} - -/** - * :: DeveloperApi :: - * Metrics pertaining to shuffle data written in a given task. - */ -@DeveloperApi -class ShuffleWriteMetrics extends Serializable { - /** - * Number of bytes written for the shuffle by this task - */ - @volatile private var _shuffleBytesWritten: Long = _ - def shuffleBytesWritten: Long = _shuffleBytesWritten - private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value - private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value - - /** - * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds - */ - @volatile private var _shuffleWriteTime: Long = _ - def shuffleWriteTime: Long = _shuffleWriteTime - private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value - private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value - - /** - * Total number of records written to the shuffle by this task - */ - @volatile private var _shuffleRecordsWritten: Long = _ - def shuffleRecordsWritten: Long = _shuffleRecordsWritten - private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value - private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value - private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value -} |