aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala55
1 files changed, 33 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
index 146cfb9ba8..9d45fff921 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
@@ -19,47 +19,58 @@ package org.apache.spark.scheduler
import org.apache.spark.annotation.DeveloperApi
+
/**
* :: DeveloperApi ::
* Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.
+ *
+ * Note: once this is JSON serialized the types of `update` and `value` will be lost and be
+ * cast to strings. This is because the user can define an accumulator of any type and it will
+ * be difficult to preserve the type in consumers of the event log. This does not apply to
+ * internal accumulators that represent task level metrics.
+ *
+ * @param id accumulator ID
+ * @param name accumulator name
+ * @param update partial value from a task, may be None if used on driver to describe a stage
+ * @param value total accumulated value so far, maybe None if used on executors to describe a task
+ * @param internal whether this accumulator was internal
+ * @param countFailedValues whether to count this accumulator's partial value if the task failed
*/
@DeveloperApi
-class AccumulableInfo private[spark] (
- val id: Long,
- val name: String,
- val update: Option[String], // represents a partial update within a task
- val value: String,
- val internal: Boolean) {
-
- override def equals(other: Any): Boolean = other match {
- case acc: AccumulableInfo =>
- this.id == acc.id && this.name == acc.name &&
- this.update == acc.update && this.value == acc.value &&
- this.internal == acc.internal
- case _ => false
- }
+case class AccumulableInfo private[spark] (
+ id: Long,
+ name: Option[String],
+ update: Option[Any], // represents a partial update within a task
+ value: Option[Any],
+ private[spark] val internal: Boolean,
+ private[spark] val countFailedValues: Boolean)
- override def hashCode(): Int = {
- val state = Seq(id, name, update, value, internal)
- state.map(_.hashCode).reduceLeft(31 * _ + _)
- }
-}
+/**
+ * A collection of deprecated constructors. This will be removed soon.
+ */
object AccumulableInfo {
+
+ @deprecated("do not create AccumulableInfo", "2.0.0")
def apply(
id: Long,
name: String,
update: Option[String],
value: String,
internal: Boolean): AccumulableInfo = {
- new AccumulableInfo(id, name, update, value, internal)
+ new AccumulableInfo(
+ id, Option(name), update, Option(value), internal, countFailedValues = false)
}
+ @deprecated("do not create AccumulableInfo", "2.0.0")
def apply(id: Long, name: String, update: Option[String], value: String): AccumulableInfo = {
- new AccumulableInfo(id, name, update, value, internal = false)
+ new AccumulableInfo(
+ id, Option(name), update, Option(value), internal = false, countFailedValues = false)
}
+ @deprecated("do not create AccumulableInfo", "2.0.0")
def apply(id: Long, name: String, value: String): AccumulableInfo = {
- new AccumulableInfo(id, name, None, value, internal = false)
+ new AccumulableInfo(
+ id, Option(name), None, Option(value), internal = false, countFailedValues = false)
}
}