From f51dfe616b24b4234199c98ea857a586a93a889f Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sun, 12 Jun 2016 11:44:33 -0700 Subject: [SPARK-15086][CORE][STREAMING] Deprecate old Java accumulator API ## What changes were proposed in this pull request? - Deprecate old Java accumulator API; should use Scala now - Update Java tests and examples - Don't bother testing old accumulator API in Java 8 (too) - (fix a misspelling too) ## How was this patch tested? Jenkins tests Author: Sean Owen Closes #13606 from srowen/SPARK-15086. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++--- .../scala/org/apache/spark/api/java/JavaSparkContext.scala | 12 ++++++++++-- core/src/test/java/org/apache/spark/JavaAPISuite.java | 6 ++++-- 3 files changed, 17 insertions(+), 7 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 230fabd211..8bfe7ecd8f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1245,7 +1245,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values - * with `+=`. Only the driver can access the accumuable's `value`. + * with `+=`. Only the driver can access the accumulable's `value`. * @tparam R accumulator result type * @tparam T type that can be added to the accumulator */ @@ -1259,8 +1259,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the - * Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can - * access the accumuable's `value`. + * Spark UI. Tasks can add values to the accumulable using the `+=` operator. Only the driver can + * access the accumulable's `value`. * @tparam R accumulator result type * @tparam T type that can be added to the accumulator */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 485a8b4222..131f36f547 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -530,6 +530,7 @@ class JavaSparkContext(val sc: SparkContext) * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values * to using the `add` method. Only the master can access the accumulator's `value`. */ + @deprecated("use sc().longAccumulator()", "2.0.0") def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] = sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]] @@ -539,6 +540,7 @@ class JavaSparkContext(val sc: SparkContext) * * This version supports naming the accumulator for display in Spark's web UI. */ + @deprecated("use sc().longAccumulator(String)", "2.0.0") def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] = sc.accumulator(initialValue, name)(IntAccumulatorParam) .asInstanceOf[Accumulator[java.lang.Integer]] @@ -547,6 +549,7 @@ class JavaSparkContext(val sc: SparkContext) * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values * to using the `add` method. Only the master can access the accumulator's `value`. */ + @deprecated("use sc().doubleAccumulator()", "2.0.0") def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] = sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]] @@ -556,6 +559,7 @@ class JavaSparkContext(val sc: SparkContext) * * This version supports naming the accumulator for display in Spark's web UI. */ + @deprecated("use sc().doubleAccumulator(String)", "2.0.0") def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] = sc.accumulator(initialValue, name)(DoubleAccumulatorParam) .asInstanceOf[Accumulator[java.lang.Double]] @@ -564,6 +568,7 @@ class JavaSparkContext(val sc: SparkContext) * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values * to using the `add` method. Only the master can access the accumulator's `value`. */ + @deprecated("use sc().longAccumulator()", "2.0.0") def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue) /** @@ -572,6 +577,7 @@ class JavaSparkContext(val sc: SparkContext) * * This version supports naming the accumulator for display in Spark's web UI. */ + @deprecated("use sc().longAccumulator(String)", "2.0.0") def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] = intAccumulator(initialValue, name) @@ -579,6 +585,7 @@ class JavaSparkContext(val sc: SparkContext) * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values * to using the `add` method. Only the master can access the accumulator's `value`. */ + @deprecated("use sc().doubleAccumulator()", "2.0.0") def accumulator(initialValue: Double): Accumulator[java.lang.Double] = doubleAccumulator(initialValue) @@ -589,6 +596,7 @@ class JavaSparkContext(val sc: SparkContext) * * This version supports naming the accumulator for display in Spark's web UI. */ + @deprecated("use sc().doubleAccumulator(String)", "2.0.0") def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] = doubleAccumulator(initialValue, name) @@ -613,7 +621,7 @@ class JavaSparkContext(val sc: SparkContext) /** * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks - * can "add" values with `add`. Only the master can access the accumuable's `value`. + * can "add" values with `add`. Only the master can access the accumulable's `value`. */ @deprecated("use AccumulatorV2", "2.0.0") def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] = @@ -621,7 +629,7 @@ class JavaSparkContext(val sc: SparkContext) /** * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks - * can "add" values with `add`. Only the master can access the accumuable's `value`. + * can "add" values with `add`. Only the master can access the accumulable's `value`. * * This version supports naming the accumulator for display in Spark's web UI. */ diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 04f92d6016..7bac068321 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -70,6 +70,7 @@ import org.apache.spark.partial.PartialResult; import org.apache.spark.rdd.RDD; import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.storage.StorageLevel; +import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.StatCounter; // The test suite itself is Serializable so that anonymous Function implementations can be @@ -287,7 +288,7 @@ public class JavaAPISuite implements Serializable { @Test public void foreach() { - final Accumulator accum = sc.accumulator(0); + final LongAccumulator accum = sc.sc().longAccumulator(); JavaRDD rdd = sc.parallelize(Arrays.asList("Hello", "World")); rdd.foreach(new VoidFunction() { @Override @@ -300,7 +301,7 @@ public class JavaAPISuite implements Serializable { @Test public void foreachPartition() { - final Accumulator accum = sc.accumulator(0); + final LongAccumulator accum = sc.sc().longAccumulator(); JavaRDD rdd = sc.parallelize(Arrays.asList("Hello", "World")); rdd.foreachPartition(new VoidFunction>() { @Override @@ -1377,6 +1378,7 @@ public class JavaAPISuite implements Serializable { assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); } + @SuppressWarnings("deprecation") @Test public void accumulators() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); -- cgit v1.2.3