aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorImran Rashid <imran@quantifind.com>2012-08-14 14:08:22 -0700
committerImran Rashid <imran@quantifind.com>2012-10-04 23:05:01 -0700
commit82a3327862d678500ecebd004c1b5b04862aaad8 (patch)
tree815814c77e166bea1ff151a06cef8f48d7c479be /core/src/test/scala
parentd6d071f19a1024d31774866c9ede7007905c90c3 (diff)
downloadspark-82a3327862d678500ecebd004c1b5b04862aaad8.tar.gz
spark-82a3327862d678500ecebd004c1b5b04862aaad8.tar.bz2
spark-82a3327862d678500ecebd004c1b5b04862aaad8.zip
make accumulator.localValue public, add tests
Conflicts: core/src/test/scala/spark/AccumulatorSuite.scala
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/AccumulatorSuite.scala15
1 files changed, 15 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala
index 71df5941e5..ee14820c6a 100644
--- a/core/src/test/scala/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/spark/AccumulatorSuite.scala
@@ -112,4 +112,19 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc.stop()
}
}
+
+ test ("localValue readable in tasks") {
+ import SetAccum._
+ val maxI = 1000
+ for (nThreads <- List(1, 10)) { //test single & multi-threaded
+ val sc = new SparkContext("local[" + nThreads + "]", "test")
+ val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
+ val d = sc.parallelize(1 to maxI)
+ d.foreach {
+ x => acc.localValue += x
+ }
+ acc.value should be ( (1 to maxI).toSet)
+ }
+ }
+
}