diff options
author | Allan Douglas R. de Oliveira <allandouglas@gmail.com> | 2014-06-20 11:03:03 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-06-20 11:03:03 -0700 |
commit | 6a224c31e8563156ad5732a23667e73076984ae1 (patch) | |
tree | e3364e0ab07258a483668635a90da442a6d0a8df /core/src/test/scala | |
parent | d484ddeff1440d8e14e05c3cd7e7a18746f1a586 (diff) | |
download | spark-6a224c31e8563156ad5732a23667e73076984ae1.tar.gz spark-6a224c31e8563156ad5732a23667e73076984ae1.tar.bz2 spark-6a224c31e8563156ad5732a23667e73076984ae1.zip |
SPARK-1868: Users should be allowed to cogroup at least 4 RDDs
Adds cogroup for 4 RDDs.
Author: Allan Douglas R. de Oliveira <allandouglas@gmail.com>
Closes #813 from douglaz/more_cogroups and squashes the following commits:
f8d6273 [Allan Douglas R. de Oliveira] Test python groupWith for one more case
0e9009c [Allan Douglas R. de Oliveira] Added scala tests
c3ffcdd [Allan Douglas R. de Oliveira] Added java tests
517a67f [Allan Douglas R. de Oliveira] Added tests for python groupWith
2f402d5 [Allan Douglas R. de Oliveira] Removed TODO
17474f4 [Allan Douglas R. de Oliveira] Use new cogroup function
7877a2a [Allan Douglas R. de Oliveira] Fixed code
ba02414 [Allan Douglas R. de Oliveira] Added varargs cogroup to pyspark
c4a8a51 [Allan Douglas R. de Oliveira] Added java cogroup 4
e94963c [Allan Douglas R. de Oliveira] Fixed spacing
f1ee57b [Allan Douglas R. de Oliveira] Fixed scala style issues
d7196f1 [Allan Douglas R. de Oliveira] Allow the cogroup of 4 RDDs
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 33 |
1 files changed, 33 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 0b9004448a..447e38ec9d 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -249,6 +249,39 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { )) } + test("groupWith3") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val rdd3 = sc.parallelize(Array((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd'))) + val joined = rdd1.groupWith(rdd2, rdd3).collect() + assert(joined.size === 4) + val joinedSet = joined.map(x => (x._1, + (x._2._1.toList, x._2._2.toList, x._2._3.toList))).toSet + assert(joinedSet === Set( + (1, (List(1, 2), List('x'), List('a'))), + (2, (List(1), List('y', 'z'), List())), + (3, (List(1), List(), List('b'))), + (4, (List(), List('w'), List('c', 'd'))) + )) + } + + test("groupWith4") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val rdd3 = sc.parallelize(Array((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd'))) + val rdd4 = sc.parallelize(Array((2, '@'))) + val joined = rdd1.groupWith(rdd2, rdd3, rdd4).collect() + assert(joined.size === 4) + val joinedSet = joined.map(x => (x._1, + (x._2._1.toList, x._2._2.toList, x._2._3.toList, x._2._4.toList))).toSet + assert(joinedSet === Set( + (1, (List(1, 2), List('x'), List('a'), List())), + (2, (List(1), List('y', 'z'), List(), List('@'))), + (3, (List(1), List(), List('b'), List())), + (4, (List(), List('w'), List('c', 'd'), List())) + )) + } + test("zero-partition RDD") { val emptyDir = Files.createTempDir() emptyDir.deleteOnExit() |