From 31e92b72e31910be1694c348ab5de8b14f2df44b Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 24 Oct 2013 21:14:56 -0700 Subject: Adding Java versions and associated tests --- .../org/apache/spark/api/java/JavaDoubleRDD.scala | 11 +++++++++++ .../org/apache/spark/api/java/JavaPairRDD.scala | 11 +++++++++++ .../scala/org/apache/spark/api/java/JavaRDD.scala | 11 +++++++++++ core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../test/scala/org/apache/spark/JavaAPISuite.java | 21 +++++++++++++++++++++ 5 files changed, 55 insertions(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index f9b6ee351a..043cb183ba 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -93,6 +93,17 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav def coalesce(numPartitions: Int, shuffle: Boolean): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions, shuffle)) + /** + * Return a new RDD that has exactly numPartitions partitions. + * + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses + * a shuffle to redistribute data. + * + * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, + * which can avoid performing a shuffle. + */ + def repartition(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.repartition(numPartitions)) + /** * Return an RDD with the elements from `this` that are not in `other`. * diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 268f43b4e8..39f408b8c8 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -107,6 +107,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions, shuffle)) + /** + * Return a new RDD that has exactly numPartitions partitions. + * + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses + * a shuffle to redistribute data. + * + * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, + * which can avoid performing a shuffle. + */ + def repartition(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.repartition(numPartitions)) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 662990049b..3b359a8fd6 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -81,6 +81,17 @@ JavaRDDLike[T, JavaRDD[T]] { def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] = rdd.coalesce(numPartitions, shuffle) + /** + * Return a new RDD that has exactly numPartitions partitions. + * + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses + * a shuffle to redistribute data. + * + * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, + * which can avoid performing a shuffle. + */ + def repartition(numPartitions: Int): JavaRDD[T] = rdd.repartition(numPartitions) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 17bc2515f2..6e88be6f6a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -268,7 +268,7 @@ abstract class RDD[T: ClassManifest]( /** * Return a new RDD that has exactly numPartitions partitions. * - * Used to increase or decrease the level of parallelism in this RDD. This will use + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 7b0bb89ab2..f38c607d65 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -472,6 +472,27 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); } + @Test + public void repartition() { + // Shrinking number of partitions + JavaRDD in1 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 2); + JavaRDD repartitioned1 = in1.repartition(4); + List> result1 = repartitioned1.glom().collect(); + Assert.assertEquals(4, result1.size()); + for (List l: result1) { + Assert.assertTrue(l.size() > 0); + } + + // Growing number of partitions + JavaRDD in2 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 4); + JavaRDD repartitioned2 = in2.repartition(2); + List> result2 = repartitioned2.glom().collect(); + Assert.assertEquals(2, result2.size()); + for (List l: result2) { + Assert.assertTrue(l.size() > 0); + } + } + @Test public void persist() { JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); -- cgit v1.2.3