diff options
author | Imran Rashid <imran@quantifind.com> | 2012-08-14 14:08:22 -0700 |
---|---|---|
committer | Imran Rashid <imran@quantifind.com> | 2012-10-04 23:05:01 -0700 |
commit | 82a3327862d678500ecebd004c1b5b04862aaad8 (patch) | |
tree | 815814c77e166bea1ff151a06cef8f48d7c479be /core/src/test/scala | |
parent | d6d071f19a1024d31774866c9ede7007905c90c3 (diff) | |
download | spark-82a3327862d678500ecebd004c1b5b04862aaad8.tar.gz spark-82a3327862d678500ecebd004c1b5b04862aaad8.tar.bz2 spark-82a3327862d678500ecebd004c1b5b04862aaad8.zip |
make accumulator.localValue public, add tests
Conflicts:
core/src/test/scala/spark/AccumulatorSuite.scala
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/spark/AccumulatorSuite.scala | 15 |
1 files changed, 15 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala index 71df5941e5..ee14820c6a 100644 --- a/core/src/test/scala/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -112,4 +112,19 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter sc.stop() } } + + test ("localValue readable in tasks") { + import SetAccum._ + val maxI = 1000 + for (nThreads <- List(1, 10)) { //test single & multi-threaded + val sc = new SparkContext("local[" + nThreads + "]", "test") + val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) + val d = sc.parallelize(1 to maxI) + d.foreach { + x => acc.localValue += x + } + acc.value should be ( (1 to maxI).toSet) + } + } + } |