diff options
author | Shivaram Venkataraman <shivaram@eecs.berkeley.edu> | 2013-04-26 16:57:46 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@eecs.berkeley.edu> | 2013-04-26 16:57:46 -0700 |
commit | c9c4954d994c5ba824e71c1c5cd8d5de531caf78 (patch) | |
tree | 02d9d2202b1cf5c5331e15006b47a31ca19d45c6 /core/src/test/scala | |
parent | 1f20ef256715e5a84ba1661e235b6eda21a70b5b (diff) | |
download | spark-c9c4954d994c5ba824e71c1c5cd8d5de531caf78.tar.gz spark-c9c4954d994c5ba824e71c1c5cd8d5de531caf78.tar.bz2 spark-c9c4954d994c5ba824e71c1c5cd8d5de531caf78.zip |
Add an interface to zip iterators of multiple RDDs
The current code supports 2, 3 or 4 arguments but can be extended
to more arguments if required.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/spark/MapZippedPartitionsSuite.scala | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/MapZippedPartitionsSuite.scala b/core/src/test/scala/spark/MapZippedPartitionsSuite.scala new file mode 100644 index 0000000000..f65a646416 --- /dev/null +++ b/core/src/test/scala/spark/MapZippedPartitionsSuite.scala @@ -0,0 +1,34 @@ +package spark + +import scala.collection.immutable.NumericRange + +import org.scalatest.FunSuite +import org.scalatest.prop.Checkers +import org.scalacheck.Arbitrary._ +import org.scalacheck.Gen +import org.scalacheck.Prop._ + +import SparkContext._ + + +object MapZippedPartitionsSuite { + def procZippedData(i: Iterator[Int], s: Iterator[String], d: Iterator[Double]) : Iterator[Int] = { + Iterator(i.toArray.size, s.toArray.size, d.toArray.size) + } +} + +class MapZippedPartitionsSuite extends FunSuite with LocalSparkContext { + test("print sizes") { + sc = new SparkContext("local", "test") + val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2) + val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2) + val data3 = sc.makeRDD(Array(1.0, 2.0), 2) + + val zippedRDD = data1.zipAndMapPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3) + + val obtainedSizes = zippedRDD.collect() + val expectedSizes = Array(2, 3, 1, 2, 3, 1) + assert(obtainedSizes.size == 6) + assert(obtainedSizes.zip(expectedSizes).forall(x => x._1 == x._2)) + } +} |