diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-05 20:54:08 -0500 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-05 20:55:17 -0500 |
commit | 86af64b0a6fde5a6418727a77b43bdfeda1b81cd (patch) | |
tree | e1d1c002bfb29de789de9be463dff273f41c43c3 /core/src/test | |
parent | 7ab9f091408dad2c264cede965ab284c9d33deb9 (diff) | |
download | spark-86af64b0a6fde5a6418727a77b43bdfeda1b81cd.tar.gz spark-86af64b0a6fde5a6418727a77b43bdfeda1b81cd.tar.bz2 spark-86af64b0a6fde5a6418727a77b43bdfeda1b81cd.zip |
Fix Accumulators in Java, and add a test for them
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/spark/JavaAPISuite.java | 44 |
1 files changed, 44 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 33d5fc2d89..b99e790093 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -581,4 +581,48 @@ public class JavaAPISuite implements Serializable { JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles); zipped.count(); } + + @Test + public void accumulators() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); + + final Accumulator<Integer> intAccum = sc.intAccumulator(10); + rdd.foreach(new VoidFunction<Integer>() { + public void call(Integer x) { + intAccum.add(x); + } + }); + Assert.assertEquals((Integer) 25, intAccum.value()); + + final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); + rdd.foreach(new VoidFunction<Integer>() { + public void call(Integer x) { + doubleAccum.add((double) x); + } + }); + Assert.assertEquals((Double) 25.0, doubleAccum.value()); + + // Try a custom accumulator type + AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() { + public Float addInPlace(Float r, Float t) { + return r + t; + } + + public Float addAccumulator(Float r, Float t) { + return r + t; + } + + public Float zero(Float initialValue) { + return 0.0f; + } + }; + + final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam); + rdd.foreach(new VoidFunction<Integer>() { + public void call(Integer x) { + floatAccum.add((float) x); + } + }); + Assert.assertEquals((Float) 25.0f, floatAccum.value()); + } } |