aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-01-05 20:54:08 -0500
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-01-05 20:55:17 -0500
commit86af64b0a6fde5a6418727a77b43bdfeda1b81cd (patch)
treee1d1c002bfb29de789de9be463dff273f41c43c3 /core/src/test
parent7ab9f091408dad2c264cede965ab284c9d33deb9 (diff)
downloadspark-86af64b0a6fde5a6418727a77b43bdfeda1b81cd.tar.gz
spark-86af64b0a6fde5a6418727a77b43bdfeda1b81cd.tar.bz2
spark-86af64b0a6fde5a6418727a77b43bdfeda1b81cd.zip
Fix Accumulators in Java, and add a test for them
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java44
1 files changed, 44 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 33d5fc2d89..b99e790093 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -581,4 +581,48 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
zipped.count();
}
+
+ @Test
+ public void accumulators() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+
+ final Accumulator<Integer> intAccum = sc.intAccumulator(10);
+ rdd.foreach(new VoidFunction<Integer>() {
+ public void call(Integer x) {
+ intAccum.add(x);
+ }
+ });
+ Assert.assertEquals((Integer) 25, intAccum.value());
+
+ final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
+ rdd.foreach(new VoidFunction<Integer>() {
+ public void call(Integer x) {
+ doubleAccum.add((double) x);
+ }
+ });
+ Assert.assertEquals((Double) 25.0, doubleAccum.value());
+
+ // Try a custom accumulator type
+ AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
+ public Float addInPlace(Float r, Float t) {
+ return r + t;
+ }
+
+ public Float addAccumulator(Float r, Float t) {
+ return r + t;
+ }
+
+ public Float zero(Float initialValue) {
+ return 0.0f;
+ }
+ };
+
+ final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+ rdd.foreach(new VoidFunction<Integer>() {
+ public void call(Integer x) {
+ floatAccum.add((float) x);
+ }
+ });
+ Assert.assertEquals((Float) 25.0f, floatAccum.value());
+ }
}