aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-04-26 16:57:46 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-04-26 16:57:46 -0700
commitc9c4954d994c5ba824e71c1c5cd8d5de531caf78 (patch)
tree02d9d2202b1cf5c5331e15006b47a31ca19d45c6 /core/src/test/scala
parent1f20ef256715e5a84ba1661e235b6eda21a70b5b (diff)
downloadspark-c9c4954d994c5ba824e71c1c5cd8d5de531caf78.tar.gz
spark-c9c4954d994c5ba824e71c1c5cd8d5de531caf78.tar.bz2
spark-c9c4954d994c5ba824e71c1c5cd8d5de531caf78.zip
Add an interface to zip iterators of multiple RDDs
The current code supports 2, 3 or 4 arguments but can be extended to more arguments if required.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/MapZippedPartitionsSuite.scala34
1 files changed, 34 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/MapZippedPartitionsSuite.scala b/core/src/test/scala/spark/MapZippedPartitionsSuite.scala
new file mode 100644
index 0000000000..f65a646416
--- /dev/null
+++ b/core/src/test/scala/spark/MapZippedPartitionsSuite.scala
@@ -0,0 +1,34 @@
+package spark
+
+import scala.collection.immutable.NumericRange
+
+import org.scalatest.FunSuite
+import org.scalatest.prop.Checkers
+import org.scalacheck.Arbitrary._
+import org.scalacheck.Gen
+import org.scalacheck.Prop._
+
+import SparkContext._
+
+
+object MapZippedPartitionsSuite {
+ def procZippedData(i: Iterator[Int], s: Iterator[String], d: Iterator[Double]) : Iterator[Int] = {
+ Iterator(i.toArray.size, s.toArray.size, d.toArray.size)
+ }
+}
+
+class MapZippedPartitionsSuite extends FunSuite with LocalSparkContext {
+ test("print sizes") {
+ sc = new SparkContext("local", "test")
+ val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
+ val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
+
+ val zippedRDD = data1.zipAndMapPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3)
+
+ val obtainedSizes = zippedRDD.collect()
+ val expectedSizes = Array(2, 3, 1, 2, 3, 1)
+ assert(obtainedSizes.size == 6)
+ assert(obtainedSizes.zip(expectedSizes).forall(x => x._1 == x._2))
+ }
+}