aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-06-12 11:44:33 -0700
committerReynold Xin <rxin@databricks.com>2016-06-12 11:44:33 -0700
commitf51dfe616b24b4234199c98ea857a586a93a889f (patch)
tree2803e1675f1948670ebc3f042789f4b401aa2b3e /core
parent50248dcfff3ba79b73323f3a804c1e19a8be6097 (diff)
downloadspark-f51dfe616b24b4234199c98ea857a586a93a889f.tar.gz
spark-f51dfe616b24b4234199c98ea857a586a93a889f.tar.bz2
spark-f51dfe616b24b4234199c98ea857a586a93a889f.zip
[SPARK-15086][CORE][STREAMING] Deprecate old Java accumulator API
## What changes were proposed in this pull request? - Deprecate old Java accumulator API; should use Scala now - Update Java tests and examples - Don't bother testing old accumulator API in Java 8 (too) - (fix a misspelling too) ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #13606 from srowen/SPARK-15086.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala12
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java6
3 files changed, 17 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 230fabd211..8bfe7ecd8f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1245,7 +1245,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
- * with `+=`. Only the driver can access the accumuable's `value`.
+ * with `+=`. Only the driver can access the accumulable's `value`.
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/
@@ -1259,8 +1259,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
- * Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
- * access the accumuable's `value`.
+ * Spark UI. Tasks can add values to the accumulable using the `+=` operator. Only the driver can
+ * access the accumulable's `value`.
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 485a8b4222..131f36f547 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -530,6 +530,7 @@ class JavaSparkContext(val sc: SparkContext)
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
+ @deprecated("use sc().longAccumulator()", "2.0.0")
def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] =
sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]
@@ -539,6 +540,7 @@ class JavaSparkContext(val sc: SparkContext)
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
+ @deprecated("use sc().longAccumulator(String)", "2.0.0")
def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
sc.accumulator(initialValue, name)(IntAccumulatorParam)
.asInstanceOf[Accumulator[java.lang.Integer]]
@@ -547,6 +549,7 @@ class JavaSparkContext(val sc: SparkContext)
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
+ @deprecated("use sc().doubleAccumulator()", "2.0.0")
def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] =
sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]
@@ -556,6 +559,7 @@ class JavaSparkContext(val sc: SparkContext)
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
+ @deprecated("use sc().doubleAccumulator(String)", "2.0.0")
def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
sc.accumulator(initialValue, name)(DoubleAccumulatorParam)
.asInstanceOf[Accumulator[java.lang.Double]]
@@ -564,6 +568,7 @@ class JavaSparkContext(val sc: SparkContext)
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
+ @deprecated("use sc().longAccumulator()", "2.0.0")
def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)
/**
@@ -572,6 +577,7 @@ class JavaSparkContext(val sc: SparkContext)
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
+ @deprecated("use sc().longAccumulator(String)", "2.0.0")
def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
intAccumulator(initialValue, name)
@@ -579,6 +585,7 @@ class JavaSparkContext(val sc: SparkContext)
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
+ @deprecated("use sc().doubleAccumulator()", "2.0.0")
def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
doubleAccumulator(initialValue)
@@ -589,6 +596,7 @@ class JavaSparkContext(val sc: SparkContext)
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
+ @deprecated("use sc().doubleAccumulator(String)", "2.0.0")
def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
doubleAccumulator(initialValue, name)
@@ -613,7 +621,7 @@ class JavaSparkContext(val sc: SparkContext)
/**
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
- * can "add" values with `add`. Only the master can access the accumuable's `value`.
+ * can "add" values with `add`. Only the master can access the accumulable's `value`.
*/
@deprecated("use AccumulatorV2", "2.0.0")
def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
@@ -621,7 +629,7 @@ class JavaSparkContext(val sc: SparkContext)
/**
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
- * can "add" values with `add`. Only the master can access the accumuable's `value`.
+ * can "add" values with `add`. Only the master can access the accumulable's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 04f92d6016..7bac068321 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -70,6 +70,7 @@ import org.apache.spark.partial.PartialResult;
import org.apache.spark.rdd.RDD;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.util.LongAccumulator;
import org.apache.spark.util.StatCounter;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -287,7 +288,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void foreach() {
- final Accumulator<Integer> accum = sc.accumulator(0);
+ final LongAccumulator accum = sc.sc().longAccumulator();
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreach(new VoidFunction<String>() {
@Override
@@ -300,7 +301,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void foreachPartition() {
- final Accumulator<Integer> accum = sc.accumulator(0);
+ final LongAccumulator accum = sc.sc().longAccumulator();
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
@Override
@@ -1377,6 +1378,7 @@ public class JavaAPISuite implements Serializable {
assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
}
+ @SuppressWarnings("deprecation")
@Test
public void accumulators() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));