aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-05-02 14:57:00 -0700
committerReynold Xin <rxin@databricks.com>2016-05-02 14:57:00 -0700
commitd5c79f564f3557037c5526e2ee5f963dd100fb34 (patch)
treeceb70f968de6cb08dfd459864be676b41bbfe43f /core
parent8a1ce4899fb9f751dedaaa34ea654dfbc8330852 (diff)
downloadspark-d5c79f564f3557037c5526e2ee5f963dd100fb34.tar.gz
spark-d5c79f564f3557037c5526e2ee5f963dd100fb34.tar.bz2
spark-d5c79f564f3557037c5526e2ee5f963dd100fb34.zip
[SPARK-15054] Deprecate old accumulator API
## What changes were proposed in this pull request? This patch deprecates the old accumulator API. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #12832 from rxin/SPARK-15054.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulable.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulator.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala5
3 files changed, 20 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala
index c76720c4bb..799c7e4fd5 100644
--- a/core/src/main/scala/org/apache/spark/Accumulable.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -17,14 +17,13 @@
package org.apache.spark
-import java.io.{ObjectInputStream, Serializable}
+import java.io.Serializable
import scala.collection.generic.Growable
import scala.reflect.ClassTag
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.util.Utils
/**
@@ -49,6 +48,7 @@ import org.apache.spark.util.Utils
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
+@deprecated("use AccumulatorV2", "2.0.0")
class Accumulable[R, T] private (
val id: Long,
// SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
@@ -162,6 +162,7 @@ class Accumulable[R, T] private (
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
+@deprecated("use AccumulatorV2", "2.0.0")
trait AccumulableParam[R, T] extends Serializable {
/**
* Add additional data to the accumulator value. Is allowed to modify and return `r`
@@ -191,6 +192,7 @@ trait AccumulableParam[R, T] extends Serializable {
}
+@deprecated("use AccumulatorV2", "2.0.0")
private[spark] class
GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
extends AccumulableParam[R, T] {
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 9b007b9776..e52d36b7b5 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -17,13 +17,6 @@
package org.apache.spark
-import java.util.concurrent.atomic.AtomicLong
-import javax.annotation.concurrent.GuardedBy
-
-import scala.collection.mutable
-import scala.ref.WeakReference
-
-import org.apache.spark.internal.Logging
import org.apache.spark.storage.{BlockId, BlockStatus}
@@ -58,7 +51,8 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
* @param name human-readable name associated with this accumulator
* @param countFailedValues whether to accumulate values from failed tasks
* @tparam T result type
- */
+*/
+@deprecated("use AccumulatorV2", "2.0.0")
class Accumulator[T] private[spark] (
// SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
@transient private val initialValue: T,
@@ -75,6 +69,7 @@ class Accumulator[T] private[spark] (
*
* @tparam T type of value to accumulate
*/
+@deprecated("use AccumulatorV2", "2.0.0")
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
addInPlace(t1, t2)
@@ -82,6 +77,7 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] {
}
+@deprecated("use AccumulatorV2", "2.0.0")
object AccumulatorParam {
// The following implicit objects were in SparkContext before 1.2 and users had to
@@ -89,21 +85,25 @@ object AccumulatorParam {
// them automatically. However, as there are duplicate codes in SparkContext for backward
// compatibility, please update them accordingly if you modify the following implicit objects.
+ @deprecated("use AccumulatorV2", "2.0.0")
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double): Double = 0.0
}
+ @deprecated("use AccumulatorV2", "2.0.0")
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int): Int = 0
}
+ @deprecated("use AccumulatorV2", "2.0.0")
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
def zero(initialValue: Long): Long = 0L
}
+ @deprecated("use AccumulatorV2", "2.0.0")
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
def zero(initialValue: Float): Float = 0f
@@ -112,6 +112,7 @@ object AccumulatorParam {
// Note: when merging values, this param just adopts the newer value. This is used only
// internally for things that shouldn't really be accumulated across tasks, like input
// read method, which should be the same across all tasks in the same stage.
+ @deprecated("use AccumulatorV2", "2.0.0")
private[spark] object StringAccumulatorParam extends AccumulatorParam[String] {
def addInPlace(t1: String, t2: String): String = t2
def zero(initialValue: String): String = ""
@@ -119,12 +120,14 @@ object AccumulatorParam {
// Note: this is expensive as it makes a copy of the list every time the caller adds an item.
// A better way to use this is to first accumulate the values yourself then them all at once.
+ @deprecated("use AccumulatorV2", "2.0.0")
private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] {
def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2
def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T]
}
// For the internal metric that records what blocks are updated in a particular task
+ @deprecated("use AccumulatorV2", "2.0.0")
private[spark] object UpdatedBlockStatusesAccumulatorParam
extends ListAccumulatorParam[(BlockId, BlockStatus)]
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d0f88d4e4d..302dec25c6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1219,6 +1219,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `+=` method. Only the driver can access the accumulator's `value`.
*/
+ @deprecated("use AccumulatorV2", "2.0.0")
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] = {
val acc = new Accumulator(initialValue, param)
cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
@@ -1230,6 +1231,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
* driver can access the accumulator's `value`.
*/
+ @deprecated("use AccumulatorV2", "2.0.0")
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T])
: Accumulator[T] = {
val acc = new Accumulator(initialValue, param, Some(name))
@@ -1243,6 +1245,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/
+ @deprecated("use AccumulatorV2", "2.0.0")
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T])
: Accumulable[R, T] = {
val acc = new Accumulable(initialValue, param)
@@ -1257,6 +1260,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/
+ @deprecated("use AccumulatorV2", "2.0.0")
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T])
: Accumulable[R, T] = {
val acc = new Accumulable(initialValue, param, Some(name))
@@ -1270,6 +1274,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
* standard mutable collections. So you can use this with mutable Map, Set, etc.
*/
+ @deprecated("use AccumulatorV2", "2.0.0")
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
(initialValue: R): Accumulable[R, T] = {
val param = new GrowableAccumulableParam[R, T]