From ce8ec5456169682f27f846e7b8d51e6c4bcf75e3 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 8 Apr 2014 18:15:52 -0700 Subject: Spark 1271: Co-Group and Group-By should pass Iterable[X] Author: Holden Karau Closes #242 from holdenk/spark-1320-cogroupandgroupshouldpassiterator and squashes the following commits: f289536 [Holden Karau] Fix bad merge, should have been Iterable rather than Iterator 77048f8 [Holden Karau] Fix merge up to master d3fe909 [Holden Karau] use toSeq instead 7a092a3 [Holden Karau] switch resultitr to resultiterable eb06216 [Holden Karau] maybe I should have had a coffee first. use correct import for guava iterables c5075aa [Holden Karau] If guava 14 had iterables 2d06e10 [Holden Karau] Fix Java 8 cogroup tests for the new API 11e730c [Holden Karau] Fix streaming tests 66b583d [Holden Karau] Fix the core test suite to compile 4ed579b [Holden Karau] Refactor from iterator to iterable d052c07 [Holden Karau] Python tests now pass with iterator pandas 3bcd81d [Holden Karau] Revert "Try and make pickling list iterators work" cd1e81c [Holden Karau] Try and make pickling list iterators work c60233a [Holden Karau] Start investigating moving to iterators for python API like the Java/Scala one. tl;dr: We will have to write our own iterator since the default one doesn't pickle well 88a5cef [Holden Karau] Fix cogroup test in JavaAPISuite for streaming a5ee714 [Holden Karau] oops, was checking wrong iterator e687f21 [Holden Karau] Fix groupbykey test in JavaAPISuite of streaming ec8cc3e [Holden Karau] Fix test issues\! 4b0eeb9 [Holden Karau] Switch cast in PairDStreamFunctions fa395c9 [Holden Karau] Revert "Add a join based on the problem in SVD" ec99e32 [Holden Karau] Revert "Revert this but for now put things in list pandas" b692868 [Holden Karau] Revert 7e533f7 [Holden Karau] Fix the bug 8a5153a [Holden Karau] Revert me, but we have some stuff to debug b4e86a9 [Holden Karau] Add a join based on the problem in SVD c4510e2 [Holden Karau] Revert this but for now put things in list pandas b4e0b1d [Holden Karau] Fix style issues 71e8b9f [Holden Karau] I really need to stop calling size on iterators, it is the path of sadness. b1ae51a [Holden Karau] Fix some of the types in the streaming JavaAPI suite. Probably still needs more work 37888ec [Holden Karau] core/tests now pass 249abde [Holden Karau] org.apache.spark.rdd.PairRDDFunctionsSuite passes 6698186 [Holden Karau] Revert "I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy" fe992fe [Holden Karau] hmmm try and fix up basic operation suite 172705c [Holden Karau] Fix Java API suite caafa63 [Holden Karau] I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy 88b3329 [Holden Karau] Fix groupbykey to actually give back an iterator 4991af6 [Holden Karau] Fix some tests be50246 [Holden Karau] Calling size on an iterator is not so good if we want to use it after 687ffbc [Holden Karau] This is the it compiles point of replacing Seq with Iterator and JList with JIterator in the groupby and cogroup signatures --- .../org/apache/spark/api/java/JavaPairRDD.scala | 36 ++++++++++---------- .../org/apache/spark/api/java/JavaRDDLike.scala | 6 ++-- .../org/apache/spark/rdd/PairRDDFunctions.scala | 39 ++++++++++++---------- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 6 ++-- .../test/java/org/apache/spark/JavaAPISuite.java | 20 ++++++----- .../test/scala/org/apache/spark/FailureSuite.scala | 4 +-- .../scala/org/apache/spark/PipedRDDSuite.scala | 2 +- .../apache/spark/rdd/PairRDDFunctionsSuite.scala | 12 +++---- .../collection/ExternalAppendOnlyMapSuite.scala | 4 +-- 9 files changed, 67 insertions(+), 62 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 9596dbaf75..e6c5d85917 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -18,6 +18,7 @@ package org.apache.spark.api.java import java.util.{Comparator, List => JList} +import java.lang.{Iterable => JIterable} import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -250,14 +251,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. */ - def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] = + def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] = fromRDD(groupByResultToJava(rdd.groupByKey(partitioner))) /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with into `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] = + def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] = fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions))) /** @@ -367,7 +368,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with the existing partitioner/parallelism level. */ - def groupByKey(): JavaPairRDD[K, JList[V]] = + def groupByKey(): JavaPairRDD[K, JIterable[V]] = fromRDD(groupByResultToJava(rdd.groupByKey())) /** @@ -462,7 +463,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner) - : JavaPairRDD[K, (JList[V], JList[W])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner))) /** @@ -470,14 +471,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], - partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner))) /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = + def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other))) /** @@ -485,7 +486,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2))) /** @@ -493,7 +494,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int) - : JavaPairRDD[K, (JList[V], JList[W])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) /** @@ -501,16 +502,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions))) /** Alias for cogroup. */ - def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = + def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] = fromRDD(cogroupResultToJava(rdd.groupWith(other))) /** Alias for cogroup. */ def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2))) /** @@ -695,21 +696,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) object JavaPairRDD { private[spark] - def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Seq[T])]): RDD[(K, JList[T])] = { - rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList) + def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = { + rddToPairRDDFunctions(rdd).mapValues(asJavaIterable) } private[spark] def cogroupResultToJava[K: ClassTag, V, W]( - rdd: RDD[(K, (Seq[V], Seq[W]))]): RDD[(K, (JList[V], JList[W]))] = { - rddToPairRDDFunctions(rdd).mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2))) + rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = { + rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2))) } private[spark] def cogroupResult2ToJava[K: ClassTag, V, W1, W2]( - rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))]): RDD[(K, (JList[V], JList[W1], JList[W2]))] = { + rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]) + : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = { rddToPairRDDFunctions(rdd) - .mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2), seqAsJavaList(x._3))) + .mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3))) } def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 6e8ec8e0c7..ae577b500c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -17,7 +17,7 @@ package org.apache.spark.api.java -import java.util.{Comparator, Iterator => JIterator, List => JList} +import java.util.{Comparator, List => JList, Iterator => JIterator} import java.lang.{Iterable => JIterable} import scala.collection.JavaConversions._ @@ -204,7 +204,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = { + def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = { implicit val ctagK: ClassTag[K] = fakeClassTag implicit val ctagV: ClassTag[JList[T]] = fakeClassTag JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag))) @@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = { + def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = { implicit val ctagK: ClassTag[K] = fakeClassTag implicit val ctagV: ClassTag[JList[T]] = fakeClassTag JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K]))) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 14386ff5b9..a92a84b534 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -261,7 +261,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. */ - def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { + def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. @@ -270,14 +270,14 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2 val bufs = combineByKey[ArrayBuffer[V]]( createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false) - bufs.asInstanceOf[RDD[(K, Seq[V])]] + bufs.mapValues(_.toIterable) } /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with into `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = { + def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = { groupByKey(new HashPartitioner(numPartitions)) } @@ -298,7 +298,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => - for (v <- vs.iterator; w <- ws.iterator) yield (v, w) + for (v <- vs; w <- ws) yield (v, w) } } @@ -311,9 +311,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (ws.isEmpty) { - vs.iterator.map(v => (v, None)) + vs.map(v => (v, None)) } else { - for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w)) + for (v <- vs; w <- ws) yield (v, Some(w)) } } } @@ -328,9 +328,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) : RDD[(K, (Option[V], W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (vs.isEmpty) { - ws.iterator.map(w => (None, w)) + ws.map(w => (None, w)) } else { - for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w) + for (v <- vs; w <- ws) yield (Some(v), w) } } } @@ -358,7 +358,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with the existing partitioner/parallelism level. */ - def groupByKey(): RDD[(K, Seq[V])] = { + def groupByKey(): RDD[(K, Iterable[V])] = { groupByKey(defaultPartitioner(self)) } @@ -453,7 +453,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) + : RDD[(K, (Iterable[V], Iterable[W]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -468,13 +469,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner) cg.mapValues { case Seq(vs, w1s, w2s) => - (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]) + (vs.asInstanceOf[Seq[V]], + w1s.asInstanceOf[Seq[W1]], + w2s.asInstanceOf[Seq[W2]]) } } @@ -482,7 +485,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(self, other)) } @@ -491,7 +494,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } @@ -499,7 +502,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, new HashPartitioner(numPartitions)) } @@ -508,18 +511,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, new HashPartitioner(numPartitions)) } /** Alias for cogroup. */ - def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(self, other)) } /** Alias for cogroup. */ def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index bf3c57ad41..74fa2a4fcd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -438,20 +438,20 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD of grouped items. */ - def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] = + def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] = groupBy[K](f, defaultPartitioner(this)) /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] = + def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] = groupBy(f, new HashPartitioner(numPartitions)) /** * Return an RDD of grouped items. */ - def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = { + def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] = { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(p) } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 762405be2a..ab2fdac553 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -18,10 +18,12 @@ package org.apache.spark; import java.io.*; +import java.lang.StringBuilder; import java.util.*; import scala.Tuple2; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.base.Optional; import com.google.common.base.Charsets; @@ -197,7 +199,7 @@ public class JavaAPISuite implements Serializable { new Tuple2("Oranges", "Citrus") )); Assert.assertEquals(2, categories.lookup("Oranges").size()); - Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size()); + Assert.assertEquals(2, Iterables.size(categories.groupByKey().lookup("Oranges").get(0))); } @Test @@ -209,15 +211,15 @@ public class JavaAPISuite implements Serializable { return x % 2 == 0; } }; - JavaPairRDD> oddsAndEvens = rdd.groupBy(isOdd); + JavaPairRDD> oddsAndEvens = rdd.groupBy(isOdd); Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens - Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds oddsAndEvens = rdd.groupBy(isOdd, 1); Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens - Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds } @SuppressWarnings("unchecked") @@ -232,9 +234,9 @@ public class JavaAPISuite implements Serializable { new Tuple2("Oranges", 2), new Tuple2("Apples", 3) )); - JavaPairRDD, List>> cogrouped = categories.cogroup(prices); - Assert.assertEquals("[Fruit, Citrus]", cogrouped.lookup("Oranges").get(0)._1().toString()); - Assert.assertEquals("[2]", cogrouped.lookup("Oranges").get(0)._2().toString()); + JavaPairRDD, Iterable>> cogrouped = categories.cogroup(prices); + Assert.assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1())); + Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2())); cogrouped.collect(); } diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index f3fb64d87a..12dbebcb28 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -72,7 +72,7 @@ class FailureSuite extends FunSuite with LocalSparkContext { throw new Exception("Intentional task failure") } } - (k, v(0) * v(0)) + (k, v.head * v.head) }.collect() FailureSuiteState.synchronized { assert(FailureSuiteState.tasksRun === 4) @@ -137,5 +137,3 @@ class FailureSuite extends FunSuite with LocalSparkContext { // TODO: Need to add tests with shuffle fetch failures. } - - diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala index 627e9b5cd9..867b28cc0d 100644 --- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala @@ -85,7 +85,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext { (f: String => Unit) => { bl.value.map(f(_)); f("\u0001") }, - (i: Tuple2[String, Seq[String]], f: String => Unit) => { + (i: Tuple2[String, Iterable[String]], f: String => Unit) => { for (e <- i._2) { f(e + "_") } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index f9e994b13d..8f3e6bd21b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -225,11 +225,12 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.groupWith(rdd2).collect() assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))), - (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))), - (3, (ArrayBuffer(1), ArrayBuffer())), - (4, (ArrayBuffer(), ArrayBuffer('w'))) + val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList))).toSet + assert(joinedSet === Set( + (1, (List(1, 2), List('x'))), + (2, (List(1), List('y', 'z'))), + (3, (List(1), List())), + (4, (List(), List('w'))) )) } @@ -447,4 +448,3 @@ class ConfigTestFormat() extends FakeFormat() with Configurable { super.getRecordWriter(p1) } } - diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index fce1184d46..cdebefb675 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -174,9 +174,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { assert(result1.toSet == Set[(Int, Int)]((0, 5), (1, 5))) // groupByKey - val result2 = rdd.groupByKey().collect() + val result2 = rdd.groupByKey().collect().map(x => (x._1, x._2.toList)).toSet assert(result2.toSet == Set[(Int, Seq[Int])] - ((0, ArrayBuffer[Int](1, 1, 1, 1, 1)), (1, ArrayBuffer[Int](1, 1, 1, 1, 1)))) + ((0, List[Int](1, 1, 1, 1, 1)), (1, List[Int](1, 1, 1, 1, 1)))) } test("simple cogroup") { -- cgit v1.2.3