diff options
author | Sean Owen <sowen@cloudera.com> | 2016-06-12 11:44:33 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-06-12 11:44:33 -0700 |
commit | f51dfe616b24b4234199c98ea857a586a93a889f (patch) | |
tree | 2803e1675f1948670ebc3f042789f4b401aa2b3e /external/java8-tests/src/test/java | |
parent | 50248dcfff3ba79b73323f3a804c1e19a8be6097 (diff) | |
download | spark-f51dfe616b24b4234199c98ea857a586a93a889f.tar.gz spark-f51dfe616b24b4234199c98ea857a586a93a889f.tar.bz2 spark-f51dfe616b24b4234199c98ea857a586a93a889f.zip |
[SPARK-15086][CORE][STREAMING] Deprecate old Java accumulator API
## What changes were proposed in this pull request?
- Deprecate old Java accumulator API; should use Scala now
- Update Java tests and examples
- Don't bother testing old accumulator API in Java 8 (too)
- (fix a misspelling too)
## How was this patch tested?
Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes #13606 from srowen/SPARK-15086.
Diffstat (limited to 'external/java8-tests/src/test/java')
-rw-r--r-- | external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java | 39 | ||||
-rw-r--r-- | external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java | 28 |
2 files changed, 0 insertions, 67 deletions
diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java b/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java index 8ee0e7e415..fa3a66e73c 100644 --- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java +++ b/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java @@ -33,8 +33,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.apache.spark.Accumulator; -import org.apache.spark.AccumulatorParam; import org.apache.spark.api.java.JavaDoubleRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -303,43 +301,6 @@ public class Java8RDDAPISuite implements Serializable { } @Test - public void accumulators() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - - Accumulator<Integer> intAccum = sc.intAccumulator(10); - rdd.foreach(intAccum::add); - Assert.assertEquals((Integer) 25, intAccum.value()); - - Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); - rdd.foreach(x -> doubleAccum.add((double) x)); - Assert.assertEquals((Double) 25.0, doubleAccum.value()); - - // Try a custom accumulator type - AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() { - @Override - public Float addInPlace(Float r, Float t) { - return r + t; - } - @Override - public Float addAccumulator(Float r, Float t) { - return r + t; - } - @Override - public Float zero(Float initialValue) { - return 0.0f; - } - }; - - Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); - rdd.foreach(x -> floatAccum.add((float) x)); - Assert.assertEquals((Float) 25.0f, floatAccum.value()); - - // Test the setValue method - floatAccum.setValue(5.0f); - Assert.assertEquals((Float) 5.0f, floatAccum.value()); - } - - @Test public void keyBy() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect(); diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java b/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java index cf5607f5e8..338ca54ab8 100644 --- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java +++ b/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java @@ -27,7 +27,6 @@ import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; -import org.apache.spark.Accumulator; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.JavaPairRDD; @@ -362,33 +361,6 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ } @Test - public void testForeachRDD() { - final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0); - final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0); - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1,1,1), - Arrays.asList(1,1,1)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output - - stream.foreachRDD(rdd -> { - accumRdd.add(1); - rdd.foreach(x -> accumEle.add(1)); - }); - - // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java - stream.foreachRDD((rdd, time) -> { - return; - }); - - JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(2, accumRdd.value().intValue()); - Assert.assertEquals(6, accumEle.value().intValue()); - } - - @Test public void testPairFlatMap() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants"), |