aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2012-12-24 13:18:45 -0800
committerMark Hamstra <markhamstra@gmail.com>2012-12-24 13:18:45 -0800
commit903f3518dfcd686cda2256b07fbc1dde6aec0178 (patch)
tree00c7bd83e157fde54435b87c59dc99e7131ecf13
parentb575cbe0691df5d17c84acfaa36ea82b74324222 (diff)
downloadspark-903f3518dfcd686cda2256b07fbc1dde6aec0178.tar.gz
spark-903f3518dfcd686cda2256b07fbc1dde6aec0178.tar.bz2
spark-903f3518dfcd686cda2256b07fbc1dde6aec0178.zip
fall back to filter-map-collect when calling lookup() on an RDD without a partitioner
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala2
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java11
2 files changed, 12 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 08ae06e865..d3e206b353 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -438,7 +438,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
val res = self.context.runJob(self, process _, Array(index), false)
res(0)
case None =>
- throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner")
+ self.filter(_._1 == key).map(_._2).collect
}
}
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 46a0b68f89..33d5fc2d89 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -131,6 +131,17 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void lookup() {
+ JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, String>("Apples", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Citrus")
+ ));
+ Assert.assertEquals(2, categories.lookup("Oranges").size());
+ Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size());
+ }
+
+ @Test
public void groupBy() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() {