diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/Accumulable.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/Accumulator.scala | 19 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 5 |
3 files changed, 20 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala index c76720c4bb..799c7e4fd5 100644 --- a/core/src/main/scala/org/apache/spark/Accumulable.scala +++ b/core/src/main/scala/org/apache/spark/Accumulable.scala @@ -17,14 +17,13 @@ package org.apache.spark -import java.io.{ObjectInputStream, Serializable} +import java.io.Serializable import scala.collection.generic.Growable import scala.reflect.ClassTag import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.util.Utils /** @@ -49,6 +48,7 @@ import org.apache.spark.util.Utils * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in */ +@deprecated("use AccumulatorV2", "2.0.0") class Accumulable[R, T] private ( val id: Long, // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile @@ -162,6 +162,7 @@ class Accumulable[R, T] private ( * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in */ +@deprecated("use AccumulatorV2", "2.0.0") trait AccumulableParam[R, T] extends Serializable { /** * Add additional data to the accumulator value. Is allowed to modify and return `r` @@ -191,6 +192,7 @@ trait AccumulableParam[R, T] extends Serializable { } +@deprecated("use AccumulatorV2", "2.0.0") private[spark] class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T] extends AccumulableParam[R, T] { diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala index 9b007b9776..e52d36b7b5 100644 --- a/core/src/main/scala/org/apache/spark/Accumulator.scala +++ b/core/src/main/scala/org/apache/spark/Accumulator.scala @@ -17,13 +17,6 @@ package org.apache.spark -import java.util.concurrent.atomic.AtomicLong -import javax.annotation.concurrent.GuardedBy - -import scala.collection.mutable -import scala.ref.WeakReference - -import org.apache.spark.internal.Logging import org.apache.spark.storage.{BlockId, BlockStatus} @@ -58,7 +51,8 @@ import org.apache.spark.storage.{BlockId, BlockStatus} * @param name human-readable name associated with this accumulator * @param countFailedValues whether to accumulate values from failed tasks * @tparam T result type - */ +*/ +@deprecated("use AccumulatorV2", "2.0.0") class Accumulator[T] private[spark] ( // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile @transient private val initialValue: T, @@ -75,6 +69,7 @@ class Accumulator[T] private[spark] ( * * @tparam T type of value to accumulate */ +@deprecated("use AccumulatorV2", "2.0.0") trait AccumulatorParam[T] extends AccumulableParam[T, T] { def addAccumulator(t1: T, t2: T): T = { addInPlace(t1, t2) @@ -82,6 +77,7 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] { } +@deprecated("use AccumulatorV2", "2.0.0") object AccumulatorParam { // The following implicit objects were in SparkContext before 1.2 and users had to @@ -89,21 +85,25 @@ object AccumulatorParam { // them automatically. However, as there are duplicate codes in SparkContext for backward // compatibility, please update them accordingly if you modify the following implicit objects. + @deprecated("use AccumulatorV2", "2.0.0") implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double): Double = 0.0 } + @deprecated("use AccumulatorV2", "2.0.0") implicit object IntAccumulatorParam extends AccumulatorParam[Int] { def addInPlace(t1: Int, t2: Int): Int = t1 + t2 def zero(initialValue: Int): Int = 0 } + @deprecated("use AccumulatorV2", "2.0.0") implicit object LongAccumulatorParam extends AccumulatorParam[Long] { def addInPlace(t1: Long, t2: Long): Long = t1 + t2 def zero(initialValue: Long): Long = 0L } + @deprecated("use AccumulatorV2", "2.0.0") implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { def addInPlace(t1: Float, t2: Float): Float = t1 + t2 def zero(initialValue: Float): Float = 0f @@ -112,6 +112,7 @@ object AccumulatorParam { // Note: when merging values, this param just adopts the newer value. This is used only // internally for things that shouldn't really be accumulated across tasks, like input // read method, which should be the same across all tasks in the same stage. + @deprecated("use AccumulatorV2", "2.0.0") private[spark] object StringAccumulatorParam extends AccumulatorParam[String] { def addInPlace(t1: String, t2: String): String = t2 def zero(initialValue: String): String = "" @@ -119,12 +120,14 @@ object AccumulatorParam { // Note: this is expensive as it makes a copy of the list every time the caller adds an item. // A better way to use this is to first accumulate the values yourself then them all at once. + @deprecated("use AccumulatorV2", "2.0.0") private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] { def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2 def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T] } // For the internal metric that records what blocks are updated in a particular task + @deprecated("use AccumulatorV2", "2.0.0") private[spark] object UpdatedBlockStatusesAccumulatorParam extends ListAccumulatorParam[(BlockId, BlockStatus)] diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d0f88d4e4d..302dec25c6 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1219,6 +1219,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" * values to using the `+=` method. Only the driver can access the accumulator's `value`. */ + @deprecated("use AccumulatorV2", "2.0.0") def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] = { val acc = new Accumulator(initialValue, param) cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc)) @@ -1230,6 +1231,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the * driver can access the accumulator's `value`. */ + @deprecated("use AccumulatorV2", "2.0.0") def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) : Accumulator[T] = { val acc = new Accumulator(initialValue, param, Some(name)) @@ -1243,6 +1245,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @tparam R accumulator result type * @tparam T type that can be added to the accumulator */ + @deprecated("use AccumulatorV2", "2.0.0") def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) : Accumulable[R, T] = { val acc = new Accumulable(initialValue, param) @@ -1257,6 +1260,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @tparam R accumulator result type * @tparam T type that can be added to the accumulator */ + @deprecated("use AccumulatorV2", "2.0.0") def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) : Accumulable[R, T] = { val acc = new Accumulable(initialValue, param, Some(name)) @@ -1270,6 +1274,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by * standard mutable collections. So you can use this with mutable Map, Set, etc. */ + @deprecated("use AccumulatorV2", "2.0.0") def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T] (initialValue: R): Accumulable[R, T] = { val param = new GrowableAccumulableParam[R, T] |